Page History
MVC WEBSOCKET 은 이전 장에서 웹소켓을 생성하고 운영하는 방식을 살펴보면 되며
여기서는 WebFlux가 지원하는 ReactiveWebSocket을 살펴보겠습니다WebFlux가 지원하는 ReactiveWebSocket을 Spring Boot 기본 Websocket(MVC)과 구현방식과 작동방식의 차이를 알아보겠습니다.
다음과 같은 큰 차이가 있기때문에, 적은 자원으로 웹소켓 커넥션 고성능 효율을 가질려면 가지려면 Reactive-WebFlux모드가 권장됩니다.
Spring MVC
- 전통적인 스레드 기반 모델을 사용합니다.
- 요청마다 새로운 스레드를 생성하거나 스레드 풀에서 가져옵니다.
- 블로킹 I/O를 기반으로 동작합니다.
- WebSocket 핸들러는 동기적으로 처리됩니다.
Spring WebFlux ( Reactive)
- 리액티브 프로그래밍 모델을 사용합니다.
- 요청 처리 시 비동기 및 논블로킹 I/O를 기반으로 동작합니다.
- 리액티브 스트림과 Publisher-Subscriber 패턴을 활용합니다.
- WebSocket 핸들러는 비동기적으로 처리되며, 이를 통해 높은 동시성과 확장성을 제공합니다.
삽질방지 팁
혹시나 인터페이스 구분으로 MVC웹소켓과 Reactive웹소켓이 SpringBoot내에 함께 운영될수 있나? 다양한 테스트를 수행해보았지만
스프링부트의 런타임시 결정된 1가지 방식만 지원하기때문에 불가능합니다런타임시 스레드풀(Mvc)/디스패처(Reactive) 다른방식으로 운영되기때문에 스트링부트 런타임시 한가지 방식만 운영가능합니다.
- MVC WebSocket과 Reactive WebSocket을 동일 SpringBoot에서 함께 작동시킬수 없습니다. 이것은 RestAPI도 마찬가지입니다.
- Reactive 모드로 작동시키기위해 SpriingBoot의 초기 작동모드가 Reactive-WebFlux여야합니다. - EnableWebFlux 어노테이션이 어노테이션 이용됨
이후 MVC와 차이를 Webplux를 Reactive 로 차이점을 설명을 진행
의존요소 차이
MVC
규정하고 MVC VS REACTIVE 구현방법 차이점 설명
구현기본기능
로컬에서 기본적인 커넥션관리를 하고 로컬세션 PubSub지원및 SendToSession 을할수 있는 기본적인 기능만 ReactiveSocket을 이용해 시도해보겠습니다.
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
- Local SocketSession Manage 중간 계층을 둠으로~ 분산상태를 관리할때 추후 다양한 MQ를 선택할수 있습니다.
의존요소 차이
MVC
Code Block | ||
---|---|---|
| ||
Code Block | ||
| ||
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-websocket") import org.springframework.web.socket.handler.TextWebSocketHandler |
...
- web.reactive.socket 하위항목 사용
Warning |
---|
주의 org.springframework.boot:spring-boot-starter-web , org.springframework.boot:spring-boot-starter-webflux 둘다 포함시켜도 빌드는 잘작동되지만 런타임은 MVC우선으로 작동됩니다. 프로그래밍 방식도 다르기때문에 전환되는형태가 아니니 프로젝트 구성시 둘중 하나만 채택해 MVC/Reactive 모드로 갈지 초기에 선택합니다. |
메시지 모델
Code Block | ||
---|---|---|
| ||
enum class MessageType {
CHAT, //For Chat
CHATBLOCK, //For ChatBot Block
PUSH, //For Push Notification
INFO, ERROR, //For SystemMessage
SESSIONID //For Session ID Update
}
enum class MessageFrom {
USER, COUNSELOR, SYSTEM
}
class EventTextMessage(
@JsonFormat(shape = JsonFormat.Shape.STRING)
val type: MessageType,
val message: String,
@JsonFormat(shape = JsonFormat.Shape.STRING)
val from: MessageFrom,
var id: String? = null,
val jsondata: String? = null
) |
- 프론트(.js)와 메시지 규약을 맞추고 시작함으로 유연한 메시지처리를 할수 있게됩니다.
SocketHandler SocketHandler 구현
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom import com.example.kotlinbootlabs.ws.handler.auth.MessageType import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage import org.springframework.web.socket.TextMessage import org.springframework.web.socket.WebSocketSession import org.springframework.web.socket.handler.TextWebSocketHandler import org.springframework.stereotype.Component @Component class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() { override fun afterConnectionEstablished(session: WebSocketSession) { sessionManager.addSession(session) } override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) { sessionManager.removeSession(session) } override fun handleTextMessage(session: WebSocketSession, message: TextMessage) { val payload = message.payload when { payload.startsWith("subscribe:") -> { val topic = payload.substringAfter("subscribe:") sessionManager.subscribeToTopic(session.id, topic) } payload.startsWith("unsubscribe:") -> { val topic = payload.substringAfter("unsubscribe:") sessionManager.unsubscribeFromTopic(session.id, topic) } else -> { sendEventTextMessage(session, EventTextMessage( type = MessageType.CHAT, message = "Echo: $payload", from = MessageFrom.SYSTEM, id = null, jsondata = null, )) } } } } |
...
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.ws import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage import org.example.kotlinbootreactivelabs.ws.model.MessageFrom import org.example.kotlinbootreactivelabs.ws.model.MessageType import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage import org.springframework.stereotype.Component import org.springframework.web.reactive.socket.WebSocketHandler import org.springframework.web.reactive.socket.WebSocketSession import reactor.core.publisher.Mono @Component class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler { override fun handle(session: WebSocketSession): Mono<Void> { sessionManager.addSession(session) return session.receive() .map { it.payloadAsText } .flatMap { payload -> when { payload.startsWith("subscribe:") -> { val topic = payload.substringAfter("subscribe:") sessionManager.subscribeReactiveToTopic(session.id, topic) Mono.empty<Void>() } payload.startsWith("unsubscribe:") -> { val topic = payload.substringAfter("unsubscribe:") sessionManager.unsubscribeReactiveFromTopic(session.id, topic) Mono.empty<Void>() } else -> { sendReactiveEventTextMessage( session, EventTextMessage( type = MessageType.CHAT, message = "Echo: $payload", from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) Mono.empty<Void>() } } } .then() .doFinally { sessionManager.removeSession(session) } } } |
전송방법
MVC
Code Block | ||
---|---|---|
| ||
fun sendEventTextMessage(session: WebSocketSession, eventTextMessage: EventTextMessage) { val objectMapper = }jacksonObjectMapper() val jsonPayload = objectMapper.writeValueAsString(eventTextMessage) } session.sendMessage(TextMessage(jsonPayload)) } |
REACTIVE
Code Block | ||
---|---|---|
| ||
fun sendReactiveEventTextMessage(session: ReactiveWebSocketSession, eventTextMessage: EventTextMessage) { val objectMapper = .thenjacksonObjectMapper() val jsonPayload = objectMapper.writeValueAsString(eventTextMessage) val .doFinallymessage {= sessionManagersession.removeSessiontextMessage(sessionjsonPayload) } }session.send(Mono.just(message)).subscribe() } |
WS EndPoint Config설정
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.web.socket.config.annotation.EnableWebSocket import org.springframework.web.socket.config.annotation.WebSocketConfigurer import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor @Configuration @EnableWebSocket class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler, private val actorWebSocketHandler: SocketHandlerWithActor, private var socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom, private val sessionManager: WebSocketSessionManager, private val socketHandlerForCounselor: SocketHandleForCounselor ) : WebSocketConfigurer { @Bean fun webSocketHandler() = MyWebSocketHandler(sessionManager) override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) { // Local WebSocket handler registry.addHandler(webSocketHandler, "/ws") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler registry.addHandler(actorWebSocketHandler, "/ws-actor") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler for User registry.addHandler(socketHandlerForPersnalRoom, "/ws-user") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler for Counselor registry.addHandler(socketHandlerForCounselor, "/ws-counselor") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") } } |
...
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.config import org.springframework.context.annotation.Configuration import org.springframework.web.reactive.config.EnableWebFlux import org.springframework.web.reactive.config.ResourceHandlerRegistry import org.springframework.web.reactive.config.WebFluxConfigurer @Configuration @EnableWebFlux class WebFluxConfig : WebFluxConfigurer { override fun addResourceHandlers(registry: ResourceHandlerRegistry) { registry.addResourceHandler("/**") .addResourceLocations("classpath:/static/") } } package org.example.kotlinbootreactivelabs.ws import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.web.reactive.HandlerMapping import org.springframework.web.reactive.config.EnableWebFlux import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy import org.springframework.web.reactive.socket.server.WebSocketService import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService @Configuration @EnableWebFlux class ReactiveWebSocketConfig(private val reactiveSocketHandler: ReactiveSocketHandler) { @Bean fun webSocketHandlerAdapter(): WebSocketHandlerAdapter { return WebSocketHandlerAdapter(webSocketService()) } @Bean fun webSocketService(): WebSocketService { return HandshakeWebSocketService(ReactorNettyRequestUpgradeStrategy()) } @Bean fun webSocketHandlerMapping(): HandlerMapping { val map = mapOf("/ws-reactive" to reactiveSocketHandler) val handlerMapping = SimpleUrlHandlerMapping() handlerMapping.order = 1 handlerMapping.urlMap = map return handlerMapping } } |
- WebFluxConfigurer : static web 리소스를 이용할때 설정이 필요합니다. ( testclient.html )
- ReactiveWebSocketConfig : reactive socket의 ws endpoint를 핸들러와 함께 설정합니다.
SessionManager
커넥션 관리를 기본적으로 하고 PUB/SUB및 SendToSession 이 기능이 있는 소켓을 관리하는 기본 정의객체객체로~
외부 MQ와 연결해 확장해 나갈수 있습니다.
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom import com.example.kotlinbootlabs.ws.handler.auth.MessageType import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage import org.springframework.web.socket.WebSocketSession import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap @Component class WebSocketSessionManager { private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java) val sessions = ConcurrentHashMap<String, WebSocketSession>() val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() fun addSession(session: WebSocketSession) { sessions[session.id] = session logger.info("Connected: ${session.id}") } fun removeSession(session: WebSocketSession) { sessions.remove(session.id) logger.info("Disconnected: ${session.id}") } fun subscribeToTopic(sessionId: String, topic: String) { topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId) logger.info("Session $sessionId subscribed to topic $topic") } fun unsubscribeFromTopic(sessionId: String, topic: String) { topicSubscriptions[topic]?.remove(sessionId) logger.info("Session $sessionId unsubscribed from topic $topic") } fun sendMessageToSession(sessionId: String, message: String) { sessions[sessionId]?.let { sendEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } fun sendMessageToTopic(topic: String, message: String) { topicSubscriptions[topic]?.forEach { sessionId -> sessions[sessionId]?.let { sendEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } } } |
...
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.ws import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage import org.example.kotlinbootreactivelabs.ws.model.MessageFrom import org.example.kotlinbootreactivelabs.ws.model.MessageType import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap @Component class WebSocketSessionManager { private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java) val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>() val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() fun addSession(session: ReactiveWebSocketSession) { reactiveSessions[session.id] = session logger.info("[SessionManager] Connected: ${session.id}") sendReactiveEventTextMessage(session, EventTextMessage( type = MessageType.INFO, message = "You are connected - ${session.id}", from = MessageFrom.SYSTEM, id = null, jsondata = null, )) } fun removeSession(session: ReactiveWebSocketSession) { reactiveSessions.remove(session.id) logger.info("[SessionManager] Disconnected: ${session.id}") } fun subscribeReactiveToTopic(sessionId: String, topic: String) { topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId) logger.info("Session $sessionId subscribed to topic $topic") } fun unsubscribeReactiveFromTopic(sessionId: String, topic: String) { topicSubscriptions[topic]?.remove(sessionId) logger.info("Session $sessionId unsubscribed from topic $topic") } fun sendReactiveMessageToSession(sessionId: String, message: String) { reactiveSessions[sessionId]?.let { sendReactiveEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } fun sendReactiveMessageToTopic(topic: String, message: String) { topicSubscriptions[topic]?.forEach { sessionId -> reactiveSessions[sessionId]?.let { sendReactiveEventTextMessage( it, EventTextMessage( type = MessageType.PUSH, message = message, from = MessageFrom.SYSTEM, id = null, jsondata = null, ) ) } } } } |
Reactive 모드로 작성된 소켓 전체 코드는 다음참고
}
}
} |
- https://docs.spring.io/spring-integration/reference/reactive-streams.html#reactive-channel-adapters
- reactive가 지원하는 reactive adapter를 이용해 pub/sub stream 처리를 더 우아하게 할것으로 기대해봅니다.
- https://docs.spring.io/spring-integration/reference/amqp.html
Reactive 모드로 작성된 소켓 pub/sub/sentoSession 위한 TEST 툴도 준비되어있으며 전체코드도 포함되어있습니다.
- 인증모드는 제외되었으며 API를 통해 pub/sub/sendToSession 기능을 모두 수행할수 있습니다.
- 전체 코드 : https://github.com/psmon/java-labs/tree/master/KotlinBootReactiveLabs/src/main/kotlin/org/example/kotlinbootreactivelabs/ws
- MCV Websocket을 다양하게 이용하기 : MVC WEBSOCKET SocketHandeler 처리방식만 다르며 다양한 변종기능을 적용할수 있습니다.