Page History
MVC WEBSOCKET 을 설정하는 방법은 이전장에서 소개가 되었으며
여기서는 WebFlux가 지원하는 ReactiveWebSocket을 살펴보겠습니다WebFlux가 지원하는 ReactiveWebSocket을 Spring Boot 기본 Websocket(MVC)과 구현방식과 작동방식의 차이를 알아보겠습니다.
다음과 같은 큰 차이가 있기때문에, 적은 자원으로 웹소켓 커넥션 고성능 효율을 가지려면 Reactive-WebFlux모드가 권장됩니다.
...
- WebFluxConfigurer : static web 리소스를 이용할때 설정이 필요합니다. ( testclient.html )
- ReactiveWebSocketConfig : reactive socket의 ws endpoint를 핸들러와 함께 설정합니다.
SessionManager
커넥션 관리를 기본적으로 하고 PUB/SUB및 SendToSession 이 기능이 있는 소켓을 관리하는 기본 객체로~
외부 MQ와 연결해 확장해 나갈수 있습니다.이 Class의 추가미션으로 유효하지 않은 소켓세션을 빠르게 제거해 소켓세션의 자원을 효율적으로 관리할수도 있습니다.
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom import com.example.kotlinbootlabs.ws.handler.auth.MessageType import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage import org.springframework.web.socket.WebSocketSession import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap @Component class WebSocketSessionManager { private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java) val sessions = ConcurrentHashMap<String, WebSocketSession>() val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() fun addSession(session: WebSocketSession) { sessions[session.id] = session logger.info("Connected: ${session.id}") } fun removeSession(session: WebSocketSession) { sessions.remove(session.id) logger.info("Disconnected: ${session.id}") } fun subscribeToTopic(sessionId: String, topic: String) { topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId) logger.info("Session $sessionId subscribed to topic $topic") } fun unsubscribeFromTopic(sessionId: String, topic: String) { topicSubscriptions[topic]?.remove(sessionId) logger.info("Session $sessionId unsubscribed from topic $topic") } fun sendMessageToSession(sessionId: String, message: String) { sessions[sessionId]?.let { sendEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } fun sendMessageToTopic(topic: String, message: String) { topicSubscriptions[topic]?.forEach { sessionId -> sessions[sessionId]?.let { sendEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } } } |
...
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.ws import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage import org.example.kotlinbootreactivelabs.ws.model.MessageFrom import org.example.kotlinbootreactivelabs.ws.model.MessageType import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap @Component class WebSocketSessionManager { private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java) val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>() val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() fun addSession(session: ReactiveWebSocketSession) { reactiveSessions[session.id] = session logger.info("[SessionManager] Connected: ${session.id}") sendReactiveEventTextMessage(session, EventTextMessage( type = MessageType.INFO, message = "You are connected - ${session.id}", from = MessageFrom.SYSTEM, id = null, jsondata = null, )) } fun removeSession(session: ReactiveWebSocketSession) { reactiveSessions.remove(session.id) logger.info("[SessionManager] Disconnected: ${session.id}") } fun subscribeReactiveToTopic(sessionId: String, topic: String) { topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId) logger.info("Session $sessionId subscribed to topic $topic") } fun unsubscribeReactiveFromTopic(sessionId: String, topic: String) { topicSubscriptions[topic]?.remove(sessionId) logger.info("Session $sessionId unsubscribed from topic $topic") } fun sendReactiveMessageToSession(sessionId: String, message: String) { reactiveSessions[sessionId]?.let { sendReactiveEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } fun sendReactiveMessageToTopic(topic: String, message: String) { topicSubscriptions[topic]?.forEach { sessionId -> reactiveSessions[sessionId]?.let { sendReactiveEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } } } |
...
- https://githubdocs.spring.comio/psmon/javaspring-labsintegration/reference/reactive-streams.html#reactive-channel-adapters
- reactive가 지원하는 reactive adapter를 이용해 pub/sub stream 처리를 더 우아하게 할것으로 기대해봅니다.
- https://docs.spring.io/spring-integration/reference/amqp.html
Reactive 모드로 작성된 소켓 pub/sub/sentoSession 위한 TEST 툴도 준비되어있으며 전체코드도 포함되어있습니다.
- 인증모드는 제외되었으며 API를 통해 pub/sub/sendToSession 기능을 모두 수행할수 있습니다.
- 전체 코드 : https://github.com/psmon/java-labs/tree/master/KotlinBootReactiveLabs/src/tree/master/KotlinBootReactiveLabs/src/main/kotlin/org/example/kotlinbootreactivelabs/ws
- MCV Websocket을 다양하게 이용하기 : MVC WEBSOCKET SocketHandeler 처리방식만 다르며 다양한 변종기능을 적용할수 있습니다.