Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

WebFlux가 지원하는 ReactiveWebSocket을  Spring Boot 기본 Websocket(MVC)과 구현방식과 작동방식의 차이를 알아보겠습니다.


다음과 같은 큰 차이가 있기때문에, 적은 자원으로 웹소켓 커넥션 고성능 효율을 가지려면 Reactive-WebFlux모드가 권장됩니다.

  • Spring MVC

    • 전통적인 스레드 기반 모델을 사용합니다.
    • 요청마다 새로운 스레드를 생성하거나 스레드 풀에서 가져옵니다.
    • 블로킹 I/O를 기반으로 동작합니다.
    • WebSocket 핸들러는 동기적으로 처리됩니다.
  • Spring WebFlux ( Reactive)

    • 리액티브 프로그래밍 모델을 사용합니다.
    • 요청 처리 시 비동기 및 논블로킹 I/O를 기반으로 동작합니다.
    • 리액티브 스트림과 Publisher-Subscriber 패턴을 활용합니다.
    • WebSocket 핸들러는 비동기적으로 처리되며, 이를 통해 높은 동시성과 확장성을 제공합니다.


인터페이스 구분으로 MVC웹소켓과 Reactive웹소켓이 SpringBoot내에 함께 운영될수 있나? 다양한 테스트를 수행해보았지만 

런타임시  스레드풀(Mvc)/디스패처(Reactive) 다른방식으로 운영되기때문에 스트링부트 런타임시 한가지 방식만 운영가능합니다.

  • MVC WebSocket과 Reactive WebSocket을 동일 SpringBoot에서 함께 작동시킬수 없습니다. 이것은 RestAPI도 마찬가지입니다.
  • Reactive 모드로 작동시키기위해 SpriingBoot의 초기 작동모드가 Reactive-WebFlux여야합니다.  - EnableWebFlux 어노테이션 이용됨

이후 Webplux를 Reactive 로 규정하고 MVC VS REACTIVE 구현방법 차이점 설명


구현기본기능

로컬에서 기본적인 커넥션관리를 하고 로컬세션 PubSub지원및 SendToSession 을할수 있는 기본적인 기능만 ReactiveSocket을 이용해 시도해보겠습니다.

draw.io Board Diagram
bordertrue
diagramNamewsneopusub
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth953
revision1

  • Local SocketSession Manage 중간 계층을 둠으로~ 분산상태를 관리할때 추후 다양한 MQ를 선택할수 있습니다.

의존요소 차이

MVC

Code Block
themeEmacs
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-websocket")

import org.springframework.web.socket.handler.TextWebSocketHandler
  • web.socket 하위항목사용


Reactive

Code Block
themeEmacs
implementation("org.springframework.boot:spring-boot-starter-webflux")

import org.springframework.web.reactive.socket.WebSocketHandler
  • web.reactive.socket 하위항목 사용


Warning

주의 org.springframework.boot:spring-boot-starter-web , org.springframework.boot:spring-boot-starter-webflux 둘다 포함시켜도 빌드는 잘작동되지만

런타임은 MVC우선으로 작동됩니다.  프로그래밍 방식도 다르기때문에 전환되는형태가 아니니

프로젝트 구성시 둘중 하나만 채택해 MVC/Reactive 모드로 갈지 초기에 선택합니다.



메시지 모델

Code Block
themeEmacs
enum class MessageType {
    CHAT,               //For Chat
    CHATBLOCK,          //For ChatBot Block
    PUSH,               //For Push Notification
    INFO, ERROR,        //For SystemMessage
    SESSIONID           //For Session ID Update
}

enum class MessageFrom {
    USER, COUNSELOR, SYSTEM
}

class EventTextMessage(
    @JsonFormat(shape = JsonFormat.Shape.STRING)
    val type: MessageType,

    val message: String,

    @JsonFormat(shape = JsonFormat.Shape.STRING)
    val from: MessageFrom,

    var id: String? = null,
    val jsondata: String? = null
)
  • 프론트(.js)와 메시지 규약을 맞추고 시작함으로 유연한 메시지처리를 할수 있게됩니다.


SocketHandler 구현

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.TextMessage
import org.springframework.web.socket.WebSocketSession
import org.springframework.web.socket.handler.TextWebSocketHandler
import org.springframework.stereotype.Component

@Component
class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() {

    override fun afterConnectionEstablished(session: WebSocketSession) {
        sessionManager.addSession(session)
    }

    override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) {
        sessionManager.removeSession(session)
    }

    override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
        val payload = message.payload

        when {
            payload.startsWith("subscribe:") -> {
                val topic = payload.substringAfter("subscribe:")
                sessionManager.subscribeToTopic(session.id, topic)
            }
            payload.startsWith("unsubscribe:") -> {
                val topic = payload.substringAfter("unsubscribe:")
                sessionManager.unsubscribeFromTopic(session.id, topic)
            }
            else -> {
                sendEventTextMessage(session, EventTextMessage(
                    type = MessageType.CHAT,
                    message = "Echo: $payload",
                    from = MessageFrom.SYSTEM,
                    id = null,
                    jsondata = null,
                ))
            }
        }
    }
}


Reactive

Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.ws


import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.stereotype.Component
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono

@Component
class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        sessionManager.addSession(session)

        return session.receive()
            .map { it.payloadAsText }
            .flatMap { payload ->
                when {
                    payload.startsWith("subscribe:") -> {
                        val topic = payload.substringAfter("subscribe:")
                        sessionManager.subscribeReactiveToTopic(session.id, topic)
                        Mono.empty<Void>()
                    }
                    payload.startsWith("unsubscribe:") -> {
                        val topic = payload.substringAfter("unsubscribe:")
                        sessionManager.unsubscribeReactiveFromTopic(session.id, topic)
                        Mono.empty<Void>()
                    }
                    else -> {
                        sendReactiveEventTextMessage(
                            session, EventTextMessage(
                                type = MessageType.CHAT,
                                message = "Echo: $payload",
                                from = MessageFrom.SYSTEM,
                                id = null,
                                jsondata = null,
                            )
                        )
                        Mono.empty<Void>()
                    }
                }
            }
            .then()
            .doFinally { sessionManager.removeSession(session) }
    }
}



전송방법

MVC

Code Block
themeEmacs
fun sendEventTextMessage(session: WebSocketSession, eventTextMessage: EventTextMessage) {
    val objectMapper = jacksonObjectMapper()
    val jsonPayload = objectMapper.writeValueAsString(eventTextMessage)
    session.sendMessage(TextMessage(jsonPayload))
}


REACTIVE

Code Block
themeEmacs
fun sendReactiveEventTextMessage(session: ReactiveWebSocketSession, eventTextMessage: EventTextMessage) {
    val objectMapper = jacksonObjectMapper()
    val jsonPayload = objectMapper.writeValueAsString(eventTextMessage)
    val message = session.textMessage(jsonPayload)
    session.send(Mono.just(message)).subscribe()
}


WS EndPoint Config설정

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor
import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom
import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.socket.config.annotation.EnableWebSocket
import org.springframework.web.socket.config.annotation.WebSocketConfigurer
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor

@Configuration
@EnableWebSocket
class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler,
                      private val actorWebSocketHandler: SocketHandlerWithActor,
                      private var socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom,
                      private val sessionManager: WebSocketSessionManager,
                      private val socketHandlerForCounselor: SocketHandleForCounselor
) : WebSocketConfigurer {

    @Bean
    fun webSocketHandler() = MyWebSocketHandler(sessionManager)

    override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
        // Local WebSocket handler
        registry.addHandler(webSocketHandler, "/ws")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler
        registry.addHandler(actorWebSocketHandler, "/ws-actor")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler for User
        registry.addHandler(socketHandlerForPersnalRoom, "/ws-user")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler for Counselor
        registry.addHandler(socketHandlerForCounselor, "/ws-counselor")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")
    }
}



Reactive

Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.config

import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.config.ResourceHandlerRegistry
import org.springframework.web.reactive.config.WebFluxConfigurer

@Configuration
@EnableWebFlux
class WebFluxConfig : WebFluxConfigurer {
    override fun addResourceHandlers(registry: ResourceHandlerRegistry) {
        registry.addResourceHandler("/**")
            .addResourceLocations("classpath:/static/")
    }
}

package org.example.kotlinbootreactivelabs.ws

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.HandlerMapping
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy
import org.springframework.web.reactive.socket.server.WebSocketService
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService

@Configuration
@EnableWebFlux
class ReactiveWebSocketConfig(private val reactiveSocketHandler: ReactiveSocketHandler) {

    @Bean
    fun webSocketHandlerAdapter(): WebSocketHandlerAdapter {
        return WebSocketHandlerAdapter(webSocketService())
    }

    @Bean
    fun webSocketService(): WebSocketService {
        return HandshakeWebSocketService(ReactorNettyRequestUpgradeStrategy())
    }

    @Bean
    fun webSocketHandlerMapping(): HandlerMapping {
        val map = mapOf("/ws-reactive" to reactiveSocketHandler)
        val handlerMapping = SimpleUrlHandlerMapping()
        handlerMapping.order = 1
        handlerMapping.urlMap = map
        return handlerMapping
    }
}
  • WebFluxConfigurer : static web 리소스를 이용할때 설정이 필요합니다. ( testclient.html )
  • ReactiveWebSocketConfig : reactive socket의 ws endpoint를 핸들러와 함께 설정합니다.

SessionManager

커넥션 관리를 기본적으로 하고 PUB/SUB및 SendToSession 기능이 있는 소켓을 관리하는 기본 객체로~

외부 MQ와 연결해 확장해 나갈수 있습니다. 

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.WebSocketSession

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap

@Component
class WebSocketSessionManager {
    private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)

    val sessions = ConcurrentHashMap<String, WebSocketSession>()

    val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()

    fun addSession(session: WebSocketSession) {
        sessions[session.id] = session
        logger.info("Connected: ${session.id}")
    }

    fun removeSession(session: WebSocketSession) {
        sessions.remove(session.id)
        logger.info("Disconnected: ${session.id}")
    }

    fun subscribeToTopic(sessionId: String, topic: String) {
        topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
        logger.info("Session $sessionId subscribed to topic $topic")
    }

    fun unsubscribeFromTopic(sessionId: String, topic: String) {
        topicSubscriptions[topic]?.remove(sessionId)
        logger.info("Session $sessionId unsubscribed from topic $topic")
    }

    fun sendMessageToSession(sessionId: String, message: String) {
        sessions[sessionId]?.let {
            sendEventTextMessage(
                it, EventTextMessage(
                    type = MessageType.PUSH,
                    message = message,
                    from = MessageFrom.SYSTEM,
                    id = null,
                    jsondata = null,
                )
            )
        }
    }

    fun sendMessageToTopic(topic: String, message: String) {
        topicSubscriptions[topic]?.forEach { sessionId ->
            sessions[sessionId]?.let {
                sendEventTextMessage(
                    it, EventTextMessage(
                        type = MessageType.PUSH,
                        message = message,
                        from = MessageFrom.SYSTEM,
                        id = null,
                        jsondata = null,
                    )
                )
            }
        }
    }
}


Reactive

Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.ws

import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap


@Component
class WebSocketSessionManager {

    private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)

    val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>()

    val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()


    fun addSession(session: ReactiveWebSocketSession) {
        reactiveSessions[session.id] = session
        logger.info("[SessionManager] Connected: ${session.id}")

        sendReactiveEventTextMessage(session, EventTextMessage(
            type = MessageType.INFO,
            message = "You are connected - ${session.id}",
            from = MessageFrom.SYSTEM,
            id = null,
            jsondata = null,
        ))
    }

    fun removeSession(session: ReactiveWebSocketSession) {
        reactiveSessions.remove(session.id)
        logger.info("[SessionManager] Disconnected: ${session.id}")
    }


    fun subscribeReactiveToTopic(sessionId: String, topic: String) {
        topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
        logger.info("Session $sessionId subscribed to topic $topic")
    }

    fun unsubscribeReactiveFromTopic(sessionId: String, topic: String) {
        topicSubscriptions[topic]?.remove(sessionId)
        logger.info("Session $sessionId unsubscribed from topic $topic")
    }

    fun sendReactiveMessageToSession(sessionId: String, message: String) {
        reactiveSessions[sessionId]?.let {
            sendReactiveEventTextMessage(
                it, EventTextMessage(
                    type = MessageType.PUSH,
                    message = message,
                    from = MessageFrom.SYSTEM,
                    id = null,
                    jsondata = null,
                )
            )
        }
    }

    fun sendReactiveMessageToTopic(topic: String, message: String) {
        topicSubscriptions[topic]?.forEach { sessionId ->
            reactiveSessions[sessionId]?.let {
                sendReactiveEventTextMessage(
                    it, EventTextMessage(
                        type = MessageType.PUSH,
                        message = message,
                        from = MessageFrom.SYSTEM,
                        id = null,
                        jsondata = null,
                    )
                )
            }
        }
    }
}


Reactive 모드로 작성된 소켓   pub/sub/sentoSession 위한 TEST 툴도 준비되어있으며 전체코드도 포함되어있습니다.

Image Added