Page History
WebFlux가 지원하는 ReactiveWebSocket을 Spring Boot 기본 Websocket(MVC)과 구현방식과 작동방식의 차이를 알아보겠습니다.
다음과 같은 큰 차이가 있기때문에, 적은 자원으로 웹소켓 커넥션 고성능 효율을 가지려면 Reactive-WebFlux모드가 권장됩니다.
Spring MVC
- 전통적인 스레드 기반 모델을 사용합니다.
- 요청마다 새로운 스레드를 생성하거나 스레드 풀에서 가져옵니다.
- 블로킹 I/O를 기반으로 동작합니다.
- WebSocket 핸들러는 동기적으로 처리됩니다.
Spring WebFlux ( Reactive)
- 리액티브 프로그래밍 모델을 사용합니다.
- 요청 처리 시 비동기 및 논블로킹 I/O를 기반으로 동작합니다.
- 리액티브 스트림과 Publisher-Subscriber 패턴을 활용합니다.
- WebSocket 핸들러는 비동기적으로 처리되며, 이를 통해 높은 동시성과 확장성을 제공합니다.
인터페이스 구분으로 MVC웹소켓과 Reactive웹소켓이 SpringBoot내에 함께 운영될수 있나? 다양한 테스트를 수행해보았지만
런타임시 스레드풀(Mvc)/디스패처(Reactive) 다른방식으로 운영되기때문에 스트링부트 런타임시 한가지 방식만 운영가능합니다.
- MVC WebSocket과 Reactive WebSocket을 동일 SpringBoot에서 함께 작동시킬수 없습니다. 이것은 RestAPI도 마찬가지입니다.
- Reactive 모드로 작동시키기위해 SpriingBoot의 초기 작동모드가 Reactive-WebFlux여야합니다. - EnableWebFlux 어노테이션 이용됨
이후 Webplux를 Reactive 로 규정하고 MVC VS REACTIVE 구현방법 차이점 설명
구현기본기능
로컬에서 기본적인 커넥션관리를 하고 로컬세션 PubSub지원및 SendToSession 을할수 있는 기본적인 기능만 ReactiveSocket을 이용해 시도해보겠습니다.
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
- Local SocketSession Manage 중간 계층을 둠으로~ 분산상태를 관리할때 추후 다양한 MQ를 선택할수 있습니다.
의존요소 차이
MVC
Code Block | ||
---|---|---|
| ||
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-websocket")
import org.springframework.web.socket.handler.TextWebSocketHandler |
- web.socket 하위항목사용
Reactive
Code Block | ||
---|---|---|
| ||
implementation("org.springframework.boot:spring-boot-starter-webflux")
import org.springframework.web.reactive.socket.WebSocketHandler |
- web.reactive.socket 하위항목 사용
Warning |
---|
주의 org.springframework.boot:spring-boot-starter-web , org.springframework.boot:spring-boot-starter-webflux 둘다 포함시켜도 빌드는 잘작동되지만 런타임은 MVC우선으로 작동됩니다. 프로그래밍 방식도 다르기때문에 전환되는형태가 아니니 프로젝트 구성시 둘중 하나만 채택해 MVC/Reactive 모드로 갈지 초기에 선택합니다. |
메시지 모델
Code Block | ||
---|---|---|
| ||
enum class MessageType {
CHAT, //For Chat
CHATBLOCK, //For ChatBot Block
PUSH, //For Push Notification
INFO, ERROR, //For SystemMessage
SESSIONID //For Session ID Update
}
enum class MessageFrom {
USER, COUNSELOR, SYSTEM
}
class EventTextMessage(
@JsonFormat(shape = JsonFormat.Shape.STRING)
val type: MessageType,
val message: String,
@JsonFormat(shape = JsonFormat.Shape.STRING)
val from: MessageFrom,
var id: String? = null,
val jsondata: String? = null
) |
- 프론트(.js)와 메시지 규약을 맞추고 시작함으로 유연한 메시지처리를 할수 있게됩니다.
SocketHandler 구현
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws
import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.TextMessage
import org.springframework.web.socket.WebSocketSession
import org.springframework.web.socket.handler.TextWebSocketHandler
import org.springframework.stereotype.Component
@Component
class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() {
override fun afterConnectionEstablished(session: WebSocketSession) {
sessionManager.addSession(session)
}
override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) {
sessionManager.removeSession(session)
}
override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
val payload = message.payload
when {
payload.startsWith("subscribe:") -> {
val topic = payload.substringAfter("subscribe:")
sessionManager.subscribeToTopic(session.id, topic)
}
payload.startsWith("unsubscribe:") -> {
val topic = payload.substringAfter("unsubscribe:")
sessionManager.unsubscribeFromTopic(session.id, topic)
}
else -> {
sendEventTextMessage(session, EventTextMessage(
type = MessageType.CHAT,
message = "Echo: $payload",
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
))
}
}
}
} |
Reactive
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.ws
import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.stereotype.Component
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono
@Component
class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
sessionManager.addSession(session)
return session.receive()
.map { it.payloadAsText }
.flatMap { payload ->
when {
payload.startsWith("subscribe:") -> {
val topic = payload.substringAfter("subscribe:")
sessionManager.subscribeReactiveToTopic(session.id, topic)
Mono.empty<Void>()
}
payload.startsWith("unsubscribe:") -> {
val topic = payload.substringAfter("unsubscribe:")
sessionManager.unsubscribeReactiveFromTopic(session.id, topic)
Mono.empty<Void>()
}
else -> {
sendReactiveEventTextMessage(
session, EventTextMessage(
type = MessageType.CHAT,
message = "Echo: $payload",
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
Mono.empty<Void>()
}
}
}
.then()
.doFinally { sessionManager.removeSession(session) }
}
} |
전송방법
MVC
Code Block | ||
---|---|---|
| ||
fun sendEventTextMessage(session: WebSocketSession, eventTextMessage: EventTextMessage) {
val objectMapper = jacksonObjectMapper()
val jsonPayload = objectMapper.writeValueAsString(eventTextMessage)
session.sendMessage(TextMessage(jsonPayload))
} |
REACTIVE
Code Block | ||
---|---|---|
| ||
fun sendReactiveEventTextMessage(session: ReactiveWebSocketSession, eventTextMessage: EventTextMessage) {
val objectMapper = jacksonObjectMapper()
val jsonPayload = objectMapper.writeValueAsString(eventTextMessage)
val message = session.textMessage(jsonPayload)
session.send(Mono.just(message)).subscribe()
} |
WS EndPoint Config설정
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws
import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor
import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom
import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.socket.config.annotation.EnableWebSocket
import org.springframework.web.socket.config.annotation.WebSocketConfigurer
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor
@Configuration
@EnableWebSocket
class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler,
private val actorWebSocketHandler: SocketHandlerWithActor,
private var socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom,
private val sessionManager: WebSocketSessionManager,
private val socketHandlerForCounselor: SocketHandleForCounselor
) : WebSocketConfigurer {
@Bean
fun webSocketHandler() = MyWebSocketHandler(sessionManager)
override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
// Local WebSocket handler
registry.addHandler(webSocketHandler, "/ws")
.addInterceptors(HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
// Actor WebSocket handler
registry.addHandler(actorWebSocketHandler, "/ws-actor")
.addInterceptors(HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
// Actor WebSocket handler for User
registry.addHandler(socketHandlerForPersnalRoom, "/ws-user")
.addInterceptors(HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
// Actor WebSocket handler for Counselor
registry.addHandler(socketHandlerForCounselor, "/ws-counselor")
.addInterceptors(HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*")
}
}
|
Reactive
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.config
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.config.ResourceHandlerRegistry
import org.springframework.web.reactive.config.WebFluxConfigurer
@Configuration
@EnableWebFlux
class WebFluxConfig : WebFluxConfigurer {
override fun addResourceHandlers(registry: ResourceHandlerRegistry) {
registry.addResourceHandler("/**")
.addResourceLocations("classpath:/static/")
}
}
package org.example.kotlinbootreactivelabs.ws
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.HandlerMapping
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy
import org.springframework.web.reactive.socket.server.WebSocketService
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService
@Configuration
@EnableWebFlux
class ReactiveWebSocketConfig(private val reactiveSocketHandler: ReactiveSocketHandler) {
@Bean
fun webSocketHandlerAdapter(): WebSocketHandlerAdapter {
return WebSocketHandlerAdapter(webSocketService())
}
@Bean
fun webSocketService(): WebSocketService {
return HandshakeWebSocketService(ReactorNettyRequestUpgradeStrategy())
}
@Bean
fun webSocketHandlerMapping(): HandlerMapping {
val map = mapOf("/ws-reactive" to reactiveSocketHandler)
val handlerMapping = SimpleUrlHandlerMapping()
handlerMapping.order = 1
handlerMapping.urlMap = map
return handlerMapping
}
} |
- WebFluxConfigurer : static web 리소스를 이용할때 설정이 필요합니다. ( testclient.html )
- ReactiveWebSocketConfig : reactive socket의 ws endpoint를 핸들러와 함께 설정합니다.
SessionManager
커넥션 관리를 기본적으로 하고 PUB/SUB및 SendToSession 기능이 있는 소켓을 관리하는 기본 객체로~
외부 MQ와 연결해 확장해 나갈수 있습니다.
MVC
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.ws
import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.WebSocketSession
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
@Component
class WebSocketSessionManager {
private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)
val sessions = ConcurrentHashMap<String, WebSocketSession>()
val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()
fun addSession(session: WebSocketSession) {
sessions[session.id] = session
logger.info("Connected: ${session.id}")
}
fun removeSession(session: WebSocketSession) {
sessions.remove(session.id)
logger.info("Disconnected: ${session.id}")
}
fun subscribeToTopic(sessionId: String, topic: String) {
topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
logger.info("Session $sessionId subscribed to topic $topic")
}
fun unsubscribeFromTopic(sessionId: String, topic: String) {
topicSubscriptions[topic]?.remove(sessionId)
logger.info("Session $sessionId unsubscribed from topic $topic")
}
fun sendMessageToSession(sessionId: String, message: String) {
sessions[sessionId]?.let {
sendEventTextMessage(
it, EventTextMessage(
type = MessageType.PUSH,
message = message,
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
}
}
fun sendMessageToTopic(topic: String, message: String) {
topicSubscriptions[topic]?.forEach { sessionId ->
sessions[sessionId]?.let {
sendEventTextMessage(
it, EventTextMessage(
type = MessageType.PUSH,
message = message,
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
}
}
}
} |
Reactive
Code Block | ||
---|---|---|
| ||
package org.example.kotlinbootreactivelabs.ws
import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap
@Component
class WebSocketSessionManager {
private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)
val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>()
val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()
fun addSession(session: ReactiveWebSocketSession) {
reactiveSessions[session.id] = session
logger.info("[SessionManager] Connected: ${session.id}")
sendReactiveEventTextMessage(session, EventTextMessage(
type = MessageType.INFO,
message = "You are connected - ${session.id}",
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
))
}
fun removeSession(session: ReactiveWebSocketSession) {
reactiveSessions.remove(session.id)
logger.info("[SessionManager] Disconnected: ${session.id}")
}
fun subscribeReactiveToTopic(sessionId: String, topic: String) {
topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
logger.info("Session $sessionId subscribed to topic $topic")
}
fun unsubscribeReactiveFromTopic(sessionId: String, topic: String) {
topicSubscriptions[topic]?.remove(sessionId)
logger.info("Session $sessionId unsubscribed from topic $topic")
}
fun sendReactiveMessageToSession(sessionId: String, message: String) {
reactiveSessions[sessionId]?.let {
sendReactiveEventTextMessage(
it, EventTextMessage(
type = MessageType.PUSH,
message = message,
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
}
}
fun sendReactiveMessageToTopic(topic: String, message: String) {
topicSubscriptions[topic]?.forEach { sessionId ->
reactiveSessions[sessionId]?.let {
sendReactiveEventTextMessage(
it, EventTextMessage(
type = MessageType.PUSH,
message = message,
from = MessageFrom.SYSTEM,
id = null,
jsondata = null,
)
)
}
}
}
} |
- https://docs.spring.io/spring-integration/reference/reactive-streams.html#reactive-channel-adapters
- reactive가 지원하는 reactive adapter를 이용해 pub/sub stream 처리를 더 우아하게 할것으로 기대해봅니다.
- https://docs.spring.io/spring-integration/reference/amqp.html
Reactive 모드로 작성된 소켓 pub/sub/sentoSession 위한 TEST 툴도 준비되어있으며 전체코드도 포함되어있습니다.
- 인증모드는 제외되었으며 API를 통해 pub/sub/sendToSession 기능을 모두 수행할수 있습니다.
- 전체 코드 : https://github.com/psmon/java-labs/tree/master/KotlinBootReactiveLabs/src/main/kotlin/org/example/kotlinbootreactivelabs/ws
- MCV Websocket을 다양하게 이용하기 : MVC WEBSOCKET SocketHandeler 처리방식만 다르며 다양한 변종기능을 적용할수 있습니다.