WebFlux가 지원하는 ReactiveWebSocket을  Spring Boot 기본 Websocket(MVC)과 구현방식과 작동방식의 차이를 알아보겠습니다.


다음과 같은 큰 차이가 있기때문에, 적은 자원으로 웹소켓 커넥션 고성능 효율을 가지려면 Reactive-WebFlux모드가 권장됩니다.

  • Spring MVC

    • 전통적인 스레드 기반 모델을 사용합니다.
    • 요청마다 새로운 스레드를 생성하거나 스레드 풀에서 가져옵니다.
    • 블로킹 I/O를 기반으로 동작합니다.
    • WebSocket 핸들러는 동기적으로 처리됩니다.
  • Spring WebFlux ( Reactive)

    • 리액티브 프로그래밍 모델을 사용합니다.
    • 요청 처리 시 비동기 및 논블로킹 I/O를 기반으로 동작합니다.
    • 리액티브 스트림과 Publisher-Subscriber 패턴을 활용합니다.
    • WebSocket 핸들러는 비동기적으로 처리되며, 이를 통해 높은 동시성과 확장성을 제공합니다.


인터페이스 구분으로 MVC웹소켓과 Reactive웹소켓이 SpringBoot내에 함께 운영될수 있나? 다양한 테스트를 수행해보았지만 

런타임시  스레드풀(Mvc)/디스패처(Reactive) 다른방식으로 운영되기때문에 스트링부트 런타임시 한가지 방식만 운영가능합니다.

  • MVC WebSocket과 Reactive WebSocket을 동일 SpringBoot에서 함께 작동시킬수 없습니다. 이것은 RestAPI도 마찬가지입니다.
  • Reactive 모드로 작동시키기위해 SpriingBoot의 초기 작동모드가 Reactive-WebFlux여야합니다.  - EnableWebFlux 어노테이션 이용됨

이후 Webplux를 Reactive 로 규정하고 MVC VS REACTIVE 구현방법 차이점 설명


구현기본기능

로컬에서 기본적인 커넥션관리를 하고 로컬세션 PubSub지원및 SendToSession 을할수 있는 기본적인 기능만 ReactiveSocket을 이용해 시도해보겠습니다.

  • Local SocketSession Manage 중간 계층을 둠으로~ 분산상태를 관리할때 추후 다양한 MQ를 선택할수 있습니다.

의존요소 차이

MVC

implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-websocket")

import org.springframework.web.socket.handler.TextWebSocketHandler
  • web.socket 하위항목사용


Reactive

implementation("org.springframework.boot:spring-boot-starter-webflux")

import org.springframework.web.reactive.socket.WebSocketHandler
  • web.reactive.socket 하위항목 사용


주의 org.springframework.boot:spring-boot-starter-web , org.springframework.boot:spring-boot-starter-webflux 둘다 포함시켜도 빌드는 잘작동되지만

런타임은 MVC우선으로 작동됩니다.  프로그래밍 방식도 다르기때문에 전환되는형태가 아니니

프로젝트 구성시 둘중 하나만 채택해 MVC/Reactive 모드로 갈지 초기에 선택합니다.



메시지 모델

enum class MessageType {
    CHAT,               //For Chat
    CHATBLOCK,          //For ChatBot Block
    PUSH,               //For Push Notification
    INFO, ERROR,        //For SystemMessage
    SESSIONID           //For Session ID Update
}

enum class MessageFrom {
    USER, COUNSELOR, SYSTEM
}

class EventTextMessage(
    @JsonFormat(shape = JsonFormat.Shape.STRING)
    val type: MessageType,

    val message: String,

    @JsonFormat(shape = JsonFormat.Shape.STRING)
    val from: MessageFrom,

    var id: String? = null,
    val jsondata: String? = null
)
  • 프론트(.js)와 메시지 규약을 맞추고 시작함으로 유연한 메시지처리를 할수 있게됩니다.


SocketHandler 구현

MVC

package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.TextMessage
import org.springframework.web.socket.WebSocketSession
import org.springframework.web.socket.handler.TextWebSocketHandler
import org.springframework.stereotype.Component

@Component
class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() {

    override fun afterConnectionEstablished(session: WebSocketSession) {
        sessionManager.addSession(session)
    }

    override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) {
        sessionManager.removeSession(session)
    }

    override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
        val payload = message.payload

        when {
            payload.startsWith("subscribe:") -> {
                val topic = payload.substringAfter("subscribe:")
                sessionManager.subscribeToTopic(session.id, topic)
            }
            payload.startsWith("unsubscribe:") -> {
                val topic = payload.substringAfter("unsubscribe:")
                sessionManager.unsubscribeFromTopic(session.id, topic)
            }
            else -> {
                sendEventTextMessage(session, EventTextMessage(
                    type = MessageType.CHAT,
                    message = "Echo: $payload",
                    from = MessageFrom.SYSTEM,
                    id = null,
                    jsondata = null,
                ))
            }
        }
    }
}


Reactive

package org.example.kotlinbootreactivelabs.ws


import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.stereotype.Component
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono

@Component
class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        sessionManager.addSession(session)

        return session.receive()
            .map { it.payloadAsText }
            .flatMap { payload ->
                when {
                    payload.startsWith("subscribe:") -> {
                        val topic = payload.substringAfter("subscribe:")
                        sessionManager.subscribeReactiveToTopic(session.id, topic)
                        Mono.empty<Void>()
                    }
                    payload.startsWith("unsubscribe:") -> {
                        val topic = payload.substringAfter("unsubscribe:")
                        sessionManager.unsubscribeReactiveFromTopic(session.id, topic)
                        Mono.empty<Void>()
                    }
                    else -> {
                        sendReactiveEventTextMessage(
                            session, EventTextMessage(
                                type = MessageType.CHAT,
                                message = "Echo: $payload",
                                from = MessageFrom.SYSTEM,
                                id = null,
                                jsondata = null,
                            )
                        )
                        Mono.empty<Void>()
                    }
                }
            }
            .then()
            .doFinally { sessionManager.removeSession(session) }
    }
}



전송방법

MVC

fun sendEventTextMessage(session: WebSocketSession, eventTextMessage: EventTextMessage) {
    val objectMapper = jacksonObjectMapper()
    val jsonPayload = objectMapper.writeValueAsString(eventTextMessage)
    session.sendMessage(TextMessage(jsonPayload))
}


REACTIVE

fun sendReactiveEventTextMessage(session: ReactiveWebSocketSession, eventTextMessage: EventTextMessage) {
    val objectMapper = jacksonObjectMapper()
    val jsonPayload = objectMapper.writeValueAsString(eventTextMessage)
    val message = session.textMessage(jsonPayload)
    session.send(Mono.just(message)).subscribe()
}


WS EndPoint Config설정

MVC

package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor
import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom
import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.socket.config.annotation.EnableWebSocket
import org.springframework.web.socket.config.annotation.WebSocketConfigurer
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor

@Configuration
@EnableWebSocket
class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler,
                      private val actorWebSocketHandler: SocketHandlerWithActor,
                      private var socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom,
                      private val sessionManager: WebSocketSessionManager,
                      private val socketHandlerForCounselor: SocketHandleForCounselor
) : WebSocketConfigurer {

    @Bean
    fun webSocketHandler() = MyWebSocketHandler(sessionManager)

    override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
        // Local WebSocket handler
        registry.addHandler(webSocketHandler, "/ws")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler
        registry.addHandler(actorWebSocketHandler, "/ws-actor")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler for User
        registry.addHandler(socketHandlerForPersnalRoom, "/ws-user")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler for Counselor
        registry.addHandler(socketHandlerForCounselor, "/ws-counselor")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")
    }
}



Reactive

package org.example.kotlinbootreactivelabs.config

import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.config.ResourceHandlerRegistry
import org.springframework.web.reactive.config.WebFluxConfigurer

@Configuration
@EnableWebFlux
class WebFluxConfig : WebFluxConfigurer {
    override fun addResourceHandlers(registry: ResourceHandlerRegistry) {
        registry.addResourceHandler("/**")
            .addResourceLocations("classpath:/static/")
    }
}

package org.example.kotlinbootreactivelabs.ws

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.HandlerMapping
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy
import org.springframework.web.reactive.socket.server.WebSocketService
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService

@Configuration
@EnableWebFlux
class ReactiveWebSocketConfig(private val reactiveSocketHandler: ReactiveSocketHandler) {

    @Bean
    fun webSocketHandlerAdapter(): WebSocketHandlerAdapter {
        return WebSocketHandlerAdapter(webSocketService())
    }

    @Bean
    fun webSocketService(): WebSocketService {
        return HandshakeWebSocketService(ReactorNettyRequestUpgradeStrategy())
    }

    @Bean
    fun webSocketHandlerMapping(): HandlerMapping {
        val map = mapOf("/ws-reactive" to reactiveSocketHandler)
        val handlerMapping = SimpleUrlHandlerMapping()
        handlerMapping.order = 1
        handlerMapping.urlMap = map
        return handlerMapping
    }
}
  • WebFluxConfigurer : static web 리소스를 이용할때 설정이 필요합니다. ( testclient.html )
  • ReactiveWebSocketConfig : reactive socket의 ws endpoint를 핸들러와 함께 설정합니다.

SessionManager

커넥션 관리를 기본적으로 하고 PUB/SUB및 SendToSession 기능이 있는 소켓을 관리하는 기본 객체로~

외부 MQ와 연결해 확장해 나갈수 있습니다. 

MVC

package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.WebSocketSession

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap

@Component
class WebSocketSessionManager {
    private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)

    val sessions = ConcurrentHashMap<String, WebSocketSession>()

    val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()

    fun addSession(session: WebSocketSession) {
        sessions[session.id] = session
        logger.info("Connected: ${session.id}")
    }

    fun removeSession(session: WebSocketSession) {
        sessions.remove(session.id)
        logger.info("Disconnected: ${session.id}")
    }

    fun subscribeToTopic(sessionId: String, topic: String) {
        topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
        logger.info("Session $sessionId subscribed to topic $topic")
    }

    fun unsubscribeFromTopic(sessionId: String, topic: String) {
        topicSubscriptions[topic]?.remove(sessionId)
        logger.info("Session $sessionId unsubscribed from topic $topic")
    }

    fun sendMessageToSession(sessionId: String, message: String) {
        sessions[sessionId]?.let {
            sendEventTextMessage(
                it, EventTextMessage(
                    type = MessageType.PUSH,
                    message = message,
                    from = MessageFrom.SYSTEM,
                    id = null,
                    jsondata = null,
                )
            )
        }
    }

    fun sendMessageToTopic(topic: String, message: String) {
        topicSubscriptions[topic]?.forEach { sessionId ->
            sessions[sessionId]?.let {
                sendEventTextMessage(
                    it, EventTextMessage(
                        type = MessageType.PUSH,
                        message = message,
                        from = MessageFrom.SYSTEM,
                        id = null,
                        jsondata = null,
                    )
                )
            }
        }
    }
}


Reactive

package org.example.kotlinbootreactivelabs.ws

import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap


@Component
class WebSocketSessionManager {

    private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)

    val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>()

    val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()


    fun addSession(session: ReactiveWebSocketSession) {
        reactiveSessions[session.id] = session
        logger.info("[SessionManager] Connected: ${session.id}")

        sendReactiveEventTextMessage(session, EventTextMessage(
            type = MessageType.INFO,
            message = "You are connected - ${session.id}",
            from = MessageFrom.SYSTEM,
            id = null,
            jsondata = null,
        ))
    }

    fun removeSession(session: ReactiveWebSocketSession) {
        reactiveSessions.remove(session.id)
        logger.info("[SessionManager] Disconnected: ${session.id}")
    }


    fun subscribeReactiveToTopic(sessionId: String, topic: String) {
        topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
        logger.info("Session $sessionId subscribed to topic $topic")
    }

    fun unsubscribeReactiveFromTopic(sessionId: String, topic: String) {
        topicSubscriptions[topic]?.remove(sessionId)
        logger.info("Session $sessionId unsubscribed from topic $topic")
    }

    fun sendReactiveMessageToSession(sessionId: String, message: String) {
        reactiveSessions[sessionId]?.let {
            sendReactiveEventTextMessage(
                it, EventTextMessage(
                    type = MessageType.PUSH,
                    message = message,
                    from = MessageFrom.SYSTEM,
                    id = null,
                    jsondata = null,
                )
            )
        }
    }

    fun sendReactiveMessageToTopic(topic: String, message: String) {
        topicSubscriptions[topic]?.forEach { sessionId ->
            reactiveSessions[sessionId]?.let {
                sendReactiveEventTextMessage(
                    it, EventTextMessage(
                        type = MessageType.PUSH,
                        message = message,
                        from = MessageFrom.SYSTEM,
                        id = null,
                        jsondata = null,
                    )
                )
            }
        }
    }
}


Reactive 모드로 작성된 소켓   pub/sub/sentoSession 위한 TEST 툴도 준비되어있으며 전체코드도 포함되어있습니다.



  • No labels