Page History
MVC WEBSOCKET 은 이전 장에서 웹소켓을 생성하고 운영하는 방식을 살펴보면 되며
여기서는 WebFlux가 지원하는 ReactiveWebSocket을 살펴보겠습니다.
다음과 같은 큰 차이가 있기때문에, 적은 자원으로 웹소켓 커넥션 고성능 효율을 가질려면 Reactive-WebFlux모드가 권장됩니다.
Spring MVC
- 전통적인 스레드 기반 모델을 사용합니다.
- 요청마다 새로운 스레드를 생성하거나 스레드 풀에서 가져옵니다.
- 블로킹 I/O를 기반으로 동작합니다.
- WebSocket 핸들러는 동기적으로 처리됩니다.
Spring WebFlux ( Reactive)
- 리액티브 프로그래밍 모델을 사용합니다.
- 요청 처리 시 비동기 및 논블로킹 I/O를 기반으로 동작합니다.
- 리액티브 스트림과 Publisher-Subscriber 패턴을 활용합니다.
- WebSocket 핸들러는 비동기적으로 처리되며, 이를 통해 높은 동시성과 확장성을 제공합니다.
삽질방지 팁
혹시나 인터페이스 구분으로 MVC웹소켓과 Reactive웹소켓이 SpringBoot내에 함께 운영될수 있나? 다양한 테스트를 수행해보았지만
스프링부트의 런타임시 결정된 1가지 방식만 지원하기때문에 불가능합니다.
- MVC WebSocket과 Reactive WebSocket을 동일 SpringBoot에서 함께 작동시킬수 없습니다. 이것은 RestAPI도 마찬가지입니다.
- Reactive 모드로 작동시키기위해 SpriingBoot의 초기 작동모드가 Reactive-WebFlux여야합니다. - EnableWebFlux 어노테이션이 이용됨
이후 MVC와 차이를 Reactive 로 차이점을 설명을 진행
의존요소 차이
MVC
Code Block | ||
---|---|---|
| ||
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-websocket")
import org.springframework.web.socket.handler.TextWebSocketHandler |
- web.socket 하위항목사용
Reactive
Code Block | ||
---|---|---|
| ||
implementation("org.springframework.boot:spring-boot-starter-webflux")
import org.springframework.web.reactive.socket.WebSocketHandler |
- web.reactive.socket 하위항목 사용
SocketHandler 구현
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws
import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.TextMessage
import org.springframework.web.socket.WebSocketSession
import org.springframework.web.socket.handler.TextWebSocketHandler
import org.springframework.stereotype.Component
@Component
class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() {
override fun afterConnectionEstablished(session: WebSocketSession) {
sessionManager.addSession(session)
}
override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) {
sessionManager.removeSession(session)
}
override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
val payload = message.payload
when {
payload.startsWith("subscribe:") -> {
val topic = payload.substringAfter("subscribe:")
sessionManager.subscribeToTopic(session.id, topic)
}
payload.startsWith("unsubscribe:") -> {
val topic = payload.substringAfter("unsubscribe:")
sessionManager.unsubscribeFromTopic(session.id, topic)
}
else -> {
sendEventTextMessage(session, EventTextMessage(
type = MessageType.CHAT,
message = "Echo: $payload",
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
))
}
}
}
} |
Reactive
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.ws
import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.stereotype.Component
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono
@Component
class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
sessionManager.addSession(session)
return session.receive()
.map { it.payloadAsText }
.flatMap { payload ->
when {
payload.startsWith("subscribe:") -> {
val topic = payload.substringAfter("subscribe:")
sessionManager.subscribeReactiveToTopic(session.id, topic)
Mono.empty<Void>()
}
payload.startsWith("unsubscribe:") -> {
val topic = payload.substringAfter("unsubscribe:")
sessionManager.unsubscribeReactiveFromTopic(session.id, topic)
Mono.empty<Void>()
}
else -> {
sendReactiveEventTextMessage(
session, EventTextMessage(
type = MessageType.CHAT,
message = "Echo: $payload",
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
Mono.empty<Void>()
}
}
}
.then()
.doFinally { sessionManager.removeSession(session) }
}
} |
WS EndPoint Config설정
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws
import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor
import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom
import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.socket.config.annotation.EnableWebSocket
import org.springframework.web.socket.config.annotation.WebSocketConfigurer
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor
@Configuration
@EnableWebSocket
class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler,
private val actorWebSocketHandler: SocketHandlerWithActor,
private var socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom,
private val sessionManager: WebSocketSessionManager,
private val socketHandlerForCounselor: SocketHandleForCounselor
) : WebSocketConfigurer {
@Bean
fun webSocketHandler() = MyWebSocketHandler(sessionManager)
override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
// Local WebSocket handler
registry.addHandler(webSocketHandler, "/ws")
.addInterceptors(HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
// Actor WebSocket handler
registry.addHandler(actorWebSocketHandler, "/ws-actor")
.addInterceptors(HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
// Actor WebSocket handler for User
registry.addHandler(socketHandlerForPersnalRoom, "/ws-user")
.addInterceptors(HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
// Actor WebSocket handler for Counselor
registry.addHandler(socketHandlerForCounselor, "/ws-counselor")
.addInterceptors(HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
}
}
|
Reactive
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.config
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.config.ResourceHandlerRegistry
import org.springframework.web.reactive.config.WebFluxConfigurer
@Configuration
@EnableWebFlux
class WebFluxConfig : WebFluxConfigurer {
override fun addResourceHandlers(registry: ResourceHandlerRegistry) {
registry.addResourceHandler("/**")
.addResourceLocations("classpath:/static/")
}
}
package org.example.kotlinbootreactivelabs.ws
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.HandlerMapping
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy
import org.springframework.web.reactive.socket.server.WebSocketService
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService
@Configuration
@EnableWebFlux
class ReactiveWebSocketConfig(private val reactiveSocketHandler: ReactiveSocketHandler) {
@Bean
fun webSocketHandlerAdapter(): WebSocketHandlerAdapter {
return WebSocketHandlerAdapter(webSocketService())
}
@Bean
fun webSocketService(): WebSocketService {
return HandshakeWebSocketService(ReactorNettyRequestUpgradeStrategy())
}
@Bean
fun webSocketHandlerMapping(): HandlerMapping {
val map = mapOf("/ws-reactive" to reactiveSocketHandler)
val handlerMapping = SimpleUrlHandlerMapping()
handlerMapping.order = 1
handlerMapping.urlMap = map
return handlerMapping
}
} |
SessionManager
커넥션 관리를 하고 PUB/SUB및 SendToSession 이 있는 소켓을 관리하는 기본 정의객체
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws
import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.WebSocketSession
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
@Component
class WebSocketSessionManager {
private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)
val sessions = ConcurrentHashMap<String, WebSocketSession>()
val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()
fun addSession(session: WebSocketSession) {
sessions[session.id] = session
logger.info("Connected: ${session.id}")
}
fun removeSession(session: WebSocketSession) {
sessions.remove(session.id)
logger.info("Disconnected: ${session.id}")
}
fun subscribeToTopic(sessionId: String, topic: String) {
topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
logger.info("Session $sessionId subscribed to topic $topic")
}
fun unsubscribeFromTopic(sessionId: String, topic: String) {
topicSubscriptions[topic]?.remove(sessionId)
logger.info("Session $sessionId unsubscribed from topic $topic")
}
fun sendMessageToSession(sessionId: String, message: String) {
sessions[sessionId]?.let {
sendEventTextMessage(
it, EventTextMessage(
type = MessageType.PUSH,
message = message,
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
}
}
fun sendMessageToTopic(topic: String, message: String) {
topicSubscriptions[topic]?.forEach { sessionId ->
sessions[sessionId]?.let {
sendEventTextMessage(
it, EventTextMessage(
type = MessageType.PUSH,
message = message,
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
}
}
}
} |
Reactive
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.ws
import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
@Component
class WebSocketSessionManager {
private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)
val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>()
val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()
fun addSession(session: ReactiveWebSocketSession) {
reactiveSessions[session.id] = session
logger.info("[SessionManager] Connected: ${session.id}")
sendReactiveEventTextMessage(session, EventTextMessage(
type = MessageType.INFO,
message = "You are connected - ${session.id}",
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
))
}
fun removeSession(session: ReactiveWebSocketSession) {
reactiveSessions.remove(session.id)
logger.info("[SessionManager] Disconnected: ${session.id}")
}
fun subscribeReactiveToTopic(sessionId: String, topic: String) {
topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
logger.info("Session $sessionId subscribed to topic $topic")
}
fun unsubscribeReactiveFromTopic(sessionId: String, topic: String) {
topicSubscriptions[topic]?.remove(sessionId)
logger.info("Session $sessionId unsubscribed from topic $topic")
}
fun sendReactiveMessageToSession(sessionId: String, message: String) {
reactiveSessions[sessionId]?.let {
sendReactiveEventTextMessage(
it, EventTextMessage(
type = MessageType.PUSH,
message = message,
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
}
}
fun sendReactiveMessageToTopic(topic: String, message: String) {
topicSubscriptions[topic]?.forEach { sessionId ->
reactiveSessions[sessionId]?.let {
sendReactiveEventTextMessage(
it, EventTextMessage(
type = MessageType.PUSH,
message = message,
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
}
}
}
} |