WebFlux가 지원하는 ReactiveWebSocket을 Spring Boot 기본 Websocket(MVC)과 구현방식과 작동방식의 차이를 알아보겠습니다.
다음과 같은 큰 차이가 있기때문에, 적은 자원으로 웹소켓 커넥션 고성능 효율을 가지려면 Reactive-WebFlux모드가 권장됩니다.
Spring MVC
- 전통적인 스레드 기반 모델을 사용합니다.
- 요청마다 새로운 스레드를 생성하거나 스레드 풀에서 가져옵니다.
- 블로킹 I/O를 기반으로 동작합니다.
- WebSocket 핸들러는 동기적으로 처리됩니다.
Spring WebFlux ( Reactive)
- 리액티브 프로그래밍 모델을 사용합니다.
- 요청 처리 시 비동기 및 논블로킹 I/O를 기반으로 동작합니다.
- 리액티브 스트림과 Publisher-Subscriber 패턴을 활용합니다.
- WebSocket 핸들러는 비동기적으로 처리되며, 이를 통해 높은 동시성과 확장성을 제공합니다.
인터페이스 구분으로 MVC웹소켓과 Reactive웹소켓이 SpringBoot내에 함께 운영될수 있나? 다양한 테스트를 수행해보았지만
런타임시 스레드풀(Mvc)/디스패처(Reactive) 다른방식으로 운영되기때문에 스트링부트 런타임시 한가지 방식만 운영가능합니다.
- MVC WebSocket과 Reactive WebSocket을 동일 SpringBoot에서 함께 작동시킬수 없습니다. 이것은 RestAPI도 마찬가지입니다.
- Reactive 모드로 작동시키기위해 SpriingBoot의 초기 작동모드가 Reactive-WebFlux여야합니다. - EnableWebFlux 어노테이션 이용됨
이후 Webplux를 Reactive 로 규정하고 MVC VS REACTIVE 구현방법 차이점 설명
구현기본기능
로컬에서 기본적인 커넥션관리를 하고 로컬세션 PubSub지원및 SendToSession 을할수 있는 기본적인 기능만 ReactiveSocket을 이용해 시도해보겠습니다.
- Local SocketSession Manage 중간 계층을 둠으로~ 분산상태를 관리할때 추후 다양한 MQ를 선택할수 있습니다.
의존요소 차이
MVC
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-websocket") import org.springframework.web.socket.handler.TextWebSocketHandler
- web.socket 하위항목사용
Reactive
implementation("org.springframework.boot:spring-boot-starter-webflux") import org.springframework.web.reactive.socket.WebSocketHandler
- web.reactive.socket 하위항목 사용
주의 org.springframework.boot:spring-boot-starter-web , org.springframework.boot:spring-boot-starter-webflux 둘다 포함시켜도 빌드는 잘작동되지만
런타임은 MVC우선으로 작동됩니다. 프로그래밍 방식도 다르기때문에 전환되는형태가 아니니
프로젝트 구성시 둘중 하나만 채택해 MVC/Reactive 모드로 갈지 초기에 선택합니다.
메시지 모델
enum class MessageType { CHAT, //For Chat CHATBLOCK, //For ChatBot Block PUSH, //For Push Notification INFO, ERROR, //For SystemMessage SESSIONID //For Session ID Update } enum class MessageFrom { USER, COUNSELOR, SYSTEM } class EventTextMessage( @JsonFormat(shape = JsonFormat.Shape.STRING) val type: MessageType, val message: String, @JsonFormat(shape = JsonFormat.Shape.STRING) val from: MessageFrom, var id: String? = null, val jsondata: String? = null )
- 프론트(.js)와 메시지 규약을 맞추고 시작함으로 유연한 메시지처리를 할수 있게됩니다.
SocketHandler 구현
MVC
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom import com.example.kotlinbootlabs.ws.handler.auth.MessageType import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage import org.springframework.web.socket.TextMessage import org.springframework.web.socket.WebSocketSession import org.springframework.web.socket.handler.TextWebSocketHandler import org.springframework.stereotype.Component @Component class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() { override fun afterConnectionEstablished(session: WebSocketSession) { sessionManager.addSession(session) } override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) { sessionManager.removeSession(session) } override fun handleTextMessage(session: WebSocketSession, message: TextMessage) { val payload = message.payload when { payload.startsWith("subscribe:") -> { val topic = payload.substringAfter("subscribe:") sessionManager.subscribeToTopic(session.id, topic) } payload.startsWith("unsubscribe:") -> { val topic = payload.substringAfter("unsubscribe:") sessionManager.unsubscribeFromTopic(session.id, topic) } else -> { sendEventTextMessage(session, EventTextMessage( type = MessageType.CHAT, message = "Echo: $payload", from = MessageFrom.SYSTEM, id = null, jsondata = null, )) } } } }
Reactive
package org.example.kotlinbootreactivelabs.ws import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage import org.example.kotlinbootreactivelabs.ws.model.MessageFrom import org.example.kotlinbootreactivelabs.ws.model.MessageType import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage import org.springframework.stereotype.Component import org.springframework.web.reactive.socket.WebSocketHandler import org.springframework.web.reactive.socket.WebSocketSession import reactor.core.publisher.Mono @Component class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler { override fun handle(session: WebSocketSession): Mono<Void> { sessionManager.addSession(session) return session.receive() .map { it.payloadAsText } .flatMap { payload -> when { payload.startsWith("subscribe:") -> { val topic = payload.substringAfter("subscribe:") sessionManager.subscribeReactiveToTopic(session.id, topic) Mono.empty<Void>() } payload.startsWith("unsubscribe:") -> { val topic = payload.substringAfter("unsubscribe:") sessionManager.unsubscribeReactiveFromTopic(session.id, topic) Mono.empty<Void>() } else -> { sendReactiveEventTextMessage( session, EventTextMessage( type = MessageType.CHAT, message = "Echo: $payload", from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) Mono.empty<Void>() } } } .then() .doFinally { sessionManager.removeSession(session) } } }
전송방법
MVC
fun sendEventTextMessage(session: WebSocketSession, eventTextMessage: EventTextMessage) { val objectMapper = jacksonObjectMapper() val jsonPayload = objectMapper.writeValueAsString(eventTextMessage) session.sendMessage(TextMessage(jsonPayload)) }
REACTIVE
fun sendReactiveEventTextMessage(session: ReactiveWebSocketSession, eventTextMessage: EventTextMessage) { val objectMapper = jacksonObjectMapper() val jsonPayload = objectMapper.writeValueAsString(eventTextMessage) val message = session.textMessage(jsonPayload) session.send(Mono.just(message)).subscribe() }
WS EndPoint Config설정
MVC
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.web.socket.config.annotation.EnableWebSocket import org.springframework.web.socket.config.annotation.WebSocketConfigurer import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor @Configuration @EnableWebSocket class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler, private val actorWebSocketHandler: SocketHandlerWithActor, private var socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom, private val sessionManager: WebSocketSessionManager, private val socketHandlerForCounselor: SocketHandleForCounselor ) : WebSocketConfigurer { @Bean fun webSocketHandler() = MyWebSocketHandler(sessionManager) override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) { // Local WebSocket handler registry.addHandler(webSocketHandler, "/ws") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler registry.addHandler(actorWebSocketHandler, "/ws-actor") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler for User registry.addHandler(socketHandlerForPersnalRoom, "/ws-user") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler for Counselor registry.addHandler(socketHandlerForCounselor, "/ws-counselor") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") } }
Reactive
package org.example.kotlinbootreactivelabs.config import org.springframework.context.annotation.Configuration import org.springframework.web.reactive.config.EnableWebFlux import org.springframework.web.reactive.config.ResourceHandlerRegistry import org.springframework.web.reactive.config.WebFluxConfigurer @Configuration @EnableWebFlux class WebFluxConfig : WebFluxConfigurer { override fun addResourceHandlers(registry: ResourceHandlerRegistry) { registry.addResourceHandler("/**") .addResourceLocations("classpath:/static/") } } package org.example.kotlinbootreactivelabs.ws import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.web.reactive.HandlerMapping import org.springframework.web.reactive.config.EnableWebFlux import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy import org.springframework.web.reactive.socket.server.WebSocketService import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService @Configuration @EnableWebFlux class ReactiveWebSocketConfig(private val reactiveSocketHandler: ReactiveSocketHandler) { @Bean fun webSocketHandlerAdapter(): WebSocketHandlerAdapter { return WebSocketHandlerAdapter(webSocketService()) } @Bean fun webSocketService(): WebSocketService { return HandshakeWebSocketService(ReactorNettyRequestUpgradeStrategy()) } @Bean fun webSocketHandlerMapping(): HandlerMapping { val map = mapOf("/ws-reactive" to reactiveSocketHandler) val handlerMapping = SimpleUrlHandlerMapping() handlerMapping.order = 1 handlerMapping.urlMap = map return handlerMapping } }
- WebFluxConfigurer : static web 리소스를 이용할때 설정이 필요합니다. ( testclient.html )
- ReactiveWebSocketConfig : reactive socket의 ws endpoint를 핸들러와 함께 설정합니다.
SessionManager
커넥션 관리를 기본적으로 하고 PUB/SUB및 SendToSession 기능이 있는 소켓을 관리하는 기본 객체로~
외부 MQ와 연결해 확장해 나갈수 있습니다.
MVC
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom import com.example.kotlinbootlabs.ws.handler.auth.MessageType import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage import org.springframework.web.socket.WebSocketSession import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap @Component class WebSocketSessionManager { private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java) val sessions = ConcurrentHashMap<String, WebSocketSession>() val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() fun addSession(session: WebSocketSession) { sessions[session.id] = session logger.info("Connected: ${session.id}") } fun removeSession(session: WebSocketSession) { sessions.remove(session.id) logger.info("Disconnected: ${session.id}") } fun subscribeToTopic(sessionId: String, topic: String) { topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId) logger.info("Session $sessionId subscribed to topic $topic") } fun unsubscribeFromTopic(sessionId: String, topic: String) { topicSubscriptions[topic]?.remove(sessionId) logger.info("Session $sessionId unsubscribed from topic $topic") } fun sendMessageToSession(sessionId: String, message: String) { sessions[sessionId]?.let { sendEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } fun sendMessageToTopic(topic: String, message: String) { topicSubscriptions[topic]?.forEach { sessionId -> sessions[sessionId]?.let { sendEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } } }
Reactive
package org.example.kotlinbootreactivelabs.ws import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage import org.example.kotlinbootreactivelabs.ws.model.MessageFrom import org.example.kotlinbootreactivelabs.ws.model.MessageType import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap @Component class WebSocketSessionManager { private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java) val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>() val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() fun addSession(session: ReactiveWebSocketSession) { reactiveSessions[session.id] = session logger.info("[SessionManager] Connected: ${session.id}") sendReactiveEventTextMessage(session, EventTextMessage( type = MessageType.INFO, message = "You are connected - ${session.id}", from = MessageFrom.SYSTEM, id = null, jsondata = null, )) } fun removeSession(session: ReactiveWebSocketSession) { reactiveSessions.remove(session.id) logger.info("[SessionManager] Disconnected: ${session.id}") } fun subscribeReactiveToTopic(sessionId: String, topic: String) { topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId) logger.info("Session $sessionId subscribed to topic $topic") } fun unsubscribeReactiveFromTopic(sessionId: String, topic: String) { topicSubscriptions[topic]?.remove(sessionId) logger.info("Session $sessionId unsubscribed from topic $topic") } fun sendReactiveMessageToSession(sessionId: String, message: String) { reactiveSessions[sessionId]?.let { sendReactiveEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } fun sendReactiveMessageToTopic(topic: String, message: String) { topicSubscriptions[topic]?.forEach { sessionId -> reactiveSessions[sessionId]?.let { sendReactiveEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } } }
- https://docs.spring.io/spring-integration/reference/reactive-streams.html#reactive-channel-adapters
- reactive가 지원하는 reactive adapter를 이용해 pub/sub stream 처리를 더 우아하게 할것으로 기대해봅니다.
- https://docs.spring.io/spring-integration/reference/amqp.html
Reactive 모드로 작성된 소켓 pub/sub/sentoSession 위한 TEST 툴도 준비되어있으며 전체코드도 포함되어있습니다.
- 인증모드는 제외되었으며 API를 통해 pub/sub/sendToSession 기능을 모두 수행할수 있습니다.
- 전체 코드 : https://github.com/psmon/java-labs/tree/master/KotlinBootReactiveLabs/src/main/kotlin/org/example/kotlinbootreactivelabs/ws
- MCV Websocket을 다양하게 이용하기 : MVC WEBSOCKET SocketHandeler 처리방식만 다르며 다양한 변종기능을 적용할수 있습니다.