MVC WEBSOCKET 은 이전 장에서 웹소켓을 생성하고 운영하는 방식을 살펴보면 되며
여기서는 WebFlux가 지원하는 ReactiveWebSocket을 살펴보겠습니다.
다음과 같은 큰 차이가 있기때문에, 적은 자원으로 웹소켓 커넥션 고성능 효율을 가질려면 Reactive-WebFlux모드가 권장됩니다.
Spring MVC
Spring WebFlux ( Reactive)
혹시나 인터페이스 구분으로 MVC웹소켓과 Reactive웹소켓이 SpringBoot내에 함께 운영될수 있나? 다양한 테스트를 수행해보았지만
스프링부트의 런타임시 결정된 1가지 방식만 지원하기때문에 불가능합니다.
이후 MVC와 차이를 Reactive 로 차이점을 설명을 진행
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-websocket") import org.springframework.web.socket.handler.TextWebSocketHandler |
implementation("org.springframework.boot:spring-boot-starter-webflux") import org.springframework.web.reactive.socket.WebSocketHandler |
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom import com.example.kotlinbootlabs.ws.handler.auth.MessageType import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage import org.springframework.web.socket.TextMessage import org.springframework.web.socket.WebSocketSession import org.springframework.web.socket.handler.TextWebSocketHandler import org.springframework.stereotype.Component @Component class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() { override fun afterConnectionEstablished(session: WebSocketSession) { sessionManager.addSession(session) } override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) { sessionManager.removeSession(session) } override fun handleTextMessage(session: WebSocketSession, message: TextMessage) { val payload = message.payload when { payload.startsWith("subscribe:") -> { val topic = payload.substringAfter("subscribe:") sessionManager.subscribeToTopic(session.id, topic) } payload.startsWith("unsubscribe:") -> { val topic = payload.substringAfter("unsubscribe:") sessionManager.unsubscribeFromTopic(session.id, topic) } else -> { sendEventTextMessage(session, EventTextMessage( type = MessageType.CHAT, message = "Echo: $payload", from = MessageFrom.SYSTEM, id = null, jsondata = null, )) } } } } |
package org.example.kotlinbootreactivelabs.ws import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage import org.example.kotlinbootreactivelabs.ws.model.MessageFrom import org.example.kotlinbootreactivelabs.ws.model.MessageType import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage import org.springframework.stereotype.Component import org.springframework.web.reactive.socket.WebSocketHandler import org.springframework.web.reactive.socket.WebSocketSession import reactor.core.publisher.Mono @Component class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler { override fun handle(session: WebSocketSession): Mono<Void> { sessionManager.addSession(session) return session.receive() .map { it.payloadAsText } .flatMap { payload -> when { payload.startsWith("subscribe:") -> { val topic = payload.substringAfter("subscribe:") sessionManager.subscribeReactiveToTopic(session.id, topic) Mono.empty<Void>() } payload.startsWith("unsubscribe:") -> { val topic = payload.substringAfter("unsubscribe:") sessionManager.unsubscribeReactiveFromTopic(session.id, topic) Mono.empty<Void>() } else -> { sendReactiveEventTextMessage( session, EventTextMessage( type = MessageType.CHAT, message = "Echo: $payload", from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) Mono.empty<Void>() } } } .then() .doFinally { sessionManager.removeSession(session) } } } |
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.web.socket.config.annotation.EnableWebSocket import org.springframework.web.socket.config.annotation.WebSocketConfigurer import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor @Configuration @EnableWebSocket class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler, private val actorWebSocketHandler: SocketHandlerWithActor, private var socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom, private val sessionManager: WebSocketSessionManager, private val socketHandlerForCounselor: SocketHandleForCounselor ) : WebSocketConfigurer { @Bean fun webSocketHandler() = MyWebSocketHandler(sessionManager) override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) { // Local WebSocket handler registry.addHandler(webSocketHandler, "/ws") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler registry.addHandler(actorWebSocketHandler, "/ws-actor") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler for User registry.addHandler(socketHandlerForPersnalRoom, "/ws-user") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler for Counselor registry.addHandler(socketHandlerForCounselor, "/ws-counselor") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") } } |
package org.example.kotlinbootreactivelabs.config import org.springframework.context.annotation.Configuration import org.springframework.web.reactive.config.EnableWebFlux import org.springframework.web.reactive.config.ResourceHandlerRegistry import org.springframework.web.reactive.config.WebFluxConfigurer @Configuration @EnableWebFlux class WebFluxConfig : WebFluxConfigurer { override fun addResourceHandlers(registry: ResourceHandlerRegistry) { registry.addResourceHandler("/**") .addResourceLocations("classpath:/static/") } } package org.example.kotlinbootreactivelabs.ws import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.web.reactive.HandlerMapping import org.springframework.web.reactive.config.EnableWebFlux import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy import org.springframework.web.reactive.socket.server.WebSocketService import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService @Configuration @EnableWebFlux class ReactiveWebSocketConfig(private val reactiveSocketHandler: ReactiveSocketHandler) { @Bean fun webSocketHandlerAdapter(): WebSocketHandlerAdapter { return WebSocketHandlerAdapter(webSocketService()) } @Bean fun webSocketService(): WebSocketService { return HandshakeWebSocketService(ReactorNettyRequestUpgradeStrategy()) } @Bean fun webSocketHandlerMapping(): HandlerMapping { val map = mapOf("/ws-reactive" to reactiveSocketHandler) val handlerMapping = SimpleUrlHandlerMapping() handlerMapping.order = 1 handlerMapping.urlMap = map return handlerMapping } } |
커넥션 관리를 하고 PUB/SUB및 SendToSession 이 있는 소켓을 관리하는 기본 정의객체
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom import com.example.kotlinbootlabs.ws.handler.auth.MessageType import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage import org.springframework.web.socket.WebSocketSession import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap @Component class WebSocketSessionManager { private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java) val sessions = ConcurrentHashMap<String, WebSocketSession>() val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() fun addSession(session: WebSocketSession) { sessions[session.id] = session logger.info("Connected: ${session.id}") } fun removeSession(session: WebSocketSession) { sessions.remove(session.id) logger.info("Disconnected: ${session.id}") } fun subscribeToTopic(sessionId: String, topic: String) { topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId) logger.info("Session $sessionId subscribed to topic $topic") } fun unsubscribeFromTopic(sessionId: String, topic: String) { topicSubscriptions[topic]?.remove(sessionId) logger.info("Session $sessionId unsubscribed from topic $topic") } fun sendMessageToSession(sessionId: String, message: String) { sessions[sessionId]?.let { sendEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } fun sendMessageToTopic(topic: String, message: String) { topicSubscriptions[topic]?.forEach { sessionId -> sessions[sessionId]?.let { sendEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } } } |
package org.example.kotlinbootreactivelabs.ws import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage import org.example.kotlinbootreactivelabs.ws.model.MessageFrom import org.example.kotlinbootreactivelabs.ws.model.MessageType import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap @Component class WebSocketSessionManager { private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java) val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>() val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() fun addSession(session: ReactiveWebSocketSession) { reactiveSessions[session.id] = session logger.info("[SessionManager] Connected: ${session.id}") sendReactiveEventTextMessage(session, EventTextMessage( type = MessageType.INFO, message = "You are connected - ${session.id}", from = MessageFrom.SYSTEM, id = null, jsondata = null, )) } fun removeSession(session: ReactiveWebSocketSession) { reactiveSessions.remove(session.id) logger.info("[SessionManager] Disconnected: ${session.id}") } fun subscribeReactiveToTopic(sessionId: String, topic: String) { topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId) logger.info("Session $sessionId subscribed to topic $topic") } fun unsubscribeReactiveFromTopic(sessionId: String, topic: String) { topicSubscriptions[topic]?.remove(sessionId) logger.info("Session $sessionId unsubscribed from topic $topic") } fun sendReactiveMessageToSession(sessionId: String, message: String) { reactiveSessions[sessionId]?.let { sendReactiveEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } fun sendReactiveMessageToTopic(topic: String, message: String) { topicSubscriptions[topic]?.forEach { sessionId -> reactiveSessions[sessionId]?.let { sendReactiveEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } } } |
Reactive 모드로 작성된 소켓 전체 코드는 다음참고