Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

MVC WEBSOCKET 은 이전 장에서 웹소켓을 생성하고 운영하는 방식을 살펴보면 되며

여기서는 WebFlux가 지원하는 ReactiveWebSocket을 살펴보겠습니다WebFlux가 지원하는 ReactiveWebSocket을  Spring Boot 기본 Websocket(MVC)과 구현방식과 작동방식의 차이를 알아보겠습니다.


다음과 같은 큰 차이가 있기때문에, 적은 자원으로 웹소켓 커넥션 고성능 효율을 가질려면 가지려면 Reactive-WebFlux모드가 권장됩니다.

  • Spring MVC

    • 전통적인 스레드 기반 모델을 사용합니다.
    • 요청마다 새로운 스레드를 생성하거나 스레드 풀에서 가져옵니다.
    • 블로킹 I/O를 기반으로 동작합니다.
    • WebSocket 핸들러는 동기적으로 처리됩니다.
  • Spring WebFlux ( Reactive)

    • 리액티브 프로그래밍 모델을 사용합니다.
    • 요청 처리 시 비동기 및 논블로킹 I/O를 기반으로 동작합니다.
    • 리액티브 스트림과 Publisher-Subscriber 패턴을 활용합니다.
    • WebSocket 핸들러는 비동기적으로 처리되며, 이를 통해 높은 동시성과 확장성을 제공합니다.

삽질방지 팁


혹시나 인터페이스 구분으로 MVC웹소켓과 Reactive웹소켓이 SpringBoot내에 함께 운영될수 있나? 다양한 테스트를 수행해보았지만 

스프링부트의 런타임시 결정된 1가지 방식만 지원하기때문에 불가능합니다런타임시  스레드풀(Mvc)/디스패처(Reactive) 다른방식으로 운영되기때문에 스트링부트 런타임시 한가지 방식만 운영가능합니다.

  • MVC WebSocket과 Reactive WebSocket을 동일 SpringBoot에서 함께 작동시킬수 없습니다. 이것은 RestAPI도 마찬가지입니다.
  • Reactive 모드로 작동시키기위해 SpriingBoot의 초기 작동모드가 Reactive-WebFlux여야합니다.  - EnableWebFlux 어노테이션이 어노테이션 이용됨

이후 MVC와 차이를 Webplux를 Reactive 로 차이점을 설명을 진행

의존요소 차이

MVC

규정하고 MVC VS REACTIVE 구현방법 차이점 설명


구현기본기능

로컬에서 기본적인 커넥션관리를 하고 로컬세션 PubSub지원및 SendToSession 을할수 있는 기본적인 기능만 ReactiveSocket을 이용해 시도해보겠습니다.

draw.io Board Diagram
bordertrue
diagramNamewsneopusub
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth953
revision1

  • Local SocketSession Manage 중간 계층을 둠으로~ 분산상태를 관리할때 추후 다양한 MQ를 선택할수 있습니다.

의존요소 차이

MVC

Code Block
theme
Code Block
themeEmacs
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-websocket")

import org.springframework.web.socket.handler.TextWebSocketHandler

...

  • web.reactive.socket 하위항목 사용

SocketHandler 구현

MVC


Warning

주의 org.springframework.boot:spring-boot-starter-web , org.springframework.boot:spring-boot-starter-webflux 둘다 포함시켜도 빌드는 잘작동되지만

런타임은 MVC우선으로 작동됩니다.  프로그래밍 방식도 다르기때문에 전환되는형태가 아니니

프로젝트 구성시 둘중 하나만 채택해 MVC/Reactive 모드로 갈지 초기에 선택합니다.



메시지 모델

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.TextMessage
import org.springframework.web.socket.WebSocketSession
import org.springframework.web.socket.handler.TextWebSocketHandler
import org.springframework.stereotype.Component

@Component
class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() {

    override fun afterConnectionEstablished(session: WebSocketSession) {
enum class MessageType {
    CHAT,               //For Chat
    CHATBLOCK,          //For  sessionManager.addSession(session)ChatBot Block
    }

PUSH,    override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) {
     //For   sessionManager.removeSession(session)Push Notification
    }

 INFO, ERROR,   override fun handleTextMessage(session: WebSocketSession, message: TextMessage) { //For SystemMessage
    SESSIONID    val payload = message.payload

    //For Session   when ID Update
}

enum class MessageFrom {
    USER, COUNSELOR, SYSTEM
}

class EventTextMessage(
    @JsonFormat(shape = payload.startsWith("subscribe:") -> {JsonFormat.Shape.STRING)
    val type: MessageType,

    val message: String,

    val@JsonFormat(shape topic = payload.substringAfter("subscribe:"JsonFormat.Shape.STRING)
    val   from: MessageFrom,

    var id: String?   sessionManager.subscribeToTopic(session.id, topic)= null,
    val jsondata: String? = null
)
  • 프론트(.js)와 메시지 규약을 맞추고 시작함으로 유연한 메시지처리를 할수 있게됩니다.


SocketHandler 구현

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.TextMessage
import org.springframework.web.socket.WebSocketSession
import org.springframework.web.socket.handler.TextWebSocketHandler
import org.springframework.stereotype.Component

@Component
class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() {

    override fun afterConnectionEstablished(session: WebSocketSession) {
    }
            payload.startsWith("unsubscribe:") -> {
                val topic = payload.substringAfter("unsubscribe:")
                sessionManager.unsubscribeFromTopic(session.id, topic)
            }
            else -> {
                sendEventTextMessagesessionManager.addSession(session, EventTextMessage()
    }

    override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) {
      type = MessageType.CHAT,sessionManager.removeSession(session)
    }

    override fun handleTextMessage(session: WebSocketSession, message: TextMessage) {
        val messagepayload = "Echo: $payload",message.payload

        when {
           from = MessageFrom.SYSTEM,
  payload.startsWith("subscribe:") -> {
                val topic id = null,payload.substringAfter("subscribe:")
                sessionManager.subscribeToTopic(session.id, topic)
    jsondata   = null,
    }
            ))
payload.startsWith("unsubscribe:") -> {
                 }val topic = payload.substringAfter("unsubscribe:")
        }
       }
}

Reactive

Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.ws


import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.stereotype.Component
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono

@Component
class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
 sessionManager.unsubscribeFromTopic(session.id, topic)
            }
            else -> {
                 sessionManager.addSession(session)

sendEventTextMessage(session, EventTextMessage(
        return session.receive()
           type .map { it.payloadAsText }= MessageType.CHAT,
            .flatMap { payload ->
     message = "Echo: $payload",
        when {
           from = MessageFrom.SYSTEM,
       payload.startsWith("subscribe:") -> {
           id             val topic = payload.substringAfter("subscribe:")= null,
                    jsondata    sessionManager.subscribeReactiveToTopic(session.id, topic)= null,
                        Mono.empty<Void>())
            }
        }
    }
}


Reactive

Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.ws


import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.stereotype.Component
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono

@Component
class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler {
                payload.startsWith("unsubscribe:") -> {
    override fun handle(session: WebSocketSession): Mono<Void> {
        sessionManager.addSession(session)

       val topicreturn = payloadsession.substringAfterreceive("unsubscribe:")
            .map { it.payloadAsText }
           sessionManager.unsubscribeReactiveFromTopic(session.id, topic) .flatMap { payload ->
                when {
       Mono.empty<Void>()
             payload.startsWith("subscribe:") -> {
     }
                   val elsetopic -> {= payload.substringAfter("subscribe:")
                        sendReactiveEventTextMessage(sessionManager.subscribeReactiveToTopic(session.id, topic)
                            session, EventTextMessage(Mono.empty<Void>()
                    }
             type = MessageType.CHAT,
     payload.startsWith("unsubscribe:") -> {
                        val messagetopic = payload.substringAfter("Echounsubscribe: $payload",)
                        sessionManager.unsubscribeReactiveFromTopic(session.id, topic)
       from = MessageFrom.SYSTEM,
                Mono.empty<Void>()
                 id  = null,}
                    else -> {
          jsondata = null,
            sendReactiveEventTextMessage(
                )
            session, EventTextMessage(
           )
                     type =  Mono.empty<Void>()
MessageType.CHAT,
                     }
           message = "Echo: $payload",
     }
                }
           from .then()= MessageFrom.SYSTEM,
            .doFinally { sessionManager.removeSession(session) }
                 }
}

WS EndPoint Config설정

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor
import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom
import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.socket.config.annotation.EnableWebSocket
import org.springframework.web.socket.config.annotation.WebSocketConfigurer
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor

@Configuration
@EnableWebSocket
class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler,
 id = null,
                                jsondata = null,
  private val actorWebSocketHandler: SocketHandlerWithActor,
                      private var)
 socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom,
                      private)
 val sessionManager: WebSocketSessionManager,
                     Mono.empty<Void>()
 private val socketHandlerForCounselor: SocketHandleForCounselor
) : WebSocketConfigurer {

          @Bean
    fun webSocketHandler() = MyWebSocketHandler(sessionManager)}

    override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
        //}
 Local WebSocket handler
        registry.addHandler(webSocketHandler, "/ws") }
            .addInterceptorsthen(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

doFinally { sessionManager.removeSession(session) }
        // Actor WebSocket handler}
}



전송방법

MVC

Code Block
themeEmacs
fun sendEventTextMessage(session: WebSocketSession, eventTextMessage: EventTextMessage) {
    val objectMapper =  registry.addHandler(actorWebSocketHandler, "/ws-actor"jacksonObjectMapper()
    val jsonPayload = objectMapper.writeValueAsString(eventTextMessage)
     session.addInterceptorssendMessage(HttpSessionHandshakeInterceptorTextMessage(jsonPayload))
}


REACTIVE

Code Block
themeEmacs
fun sendReactiveEventTextMessage(session: ReactiveWebSocketSession, eventTextMessage: EventTextMessage) {
    val objectMapper = .setAllowedOriginsjacksonObjectMapper("*")

    val jsonPayload   // Actor WebSocket handler for User= objectMapper.writeValueAsString(eventTextMessage)
    val message =  registrysession.addHandler(socketHandlerForPersnalRoom, "/ws-user"textMessage(jsonPayload)
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler for Counselor
        registry.addHandler(socketHandlerForCounselor, "/ws-counselor")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")
    }
}


Reactive

session.send(Mono.just(message)).subscribe()
}


WS EndPoint Config설정

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor
import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom
import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.socket.config.annotation.EnableWebSocket
Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.config

import org.springframework.context.annotation.Configuration
import org.springframework.web.reactivesocket.config.annotation.EnableWebFluxWebSocketConfigurer
import org.springframework.web.reactivesocket.config.ResourceHandlerRegistryannotation.WebSocketHandlerRegistry
import org.springframework.web.socket.reactiveserver.configsupport.WebFluxConfigurerHttpSessionHandshakeInterceptor

@Configuration
@EnableWebFlux@EnableWebSocket
class WebFluxConfig : WebFluxConfigurer {
WebSocketConfig(private val webSocketHandler: MyWebSocketHandler,
                      overrideprivate funval addResourceHandlers(registry: ResourceHandlerRegistry) {
actorWebSocketHandler: SocketHandlerWithActor,
            registry.addResourceHandler("/**")
          private var .addResourceLocations("classpath:/static/")socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom,
    }
}

package org.example.kotlinbootreactivelabs.ws

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.HandlerMapping
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy
import org.springframework.web.reactive.socket.server.WebSocketService
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService

@Configuration
@EnableWebFlux
class ReactiveWebSocketConfig(private val reactiveSocketHandler: ReactiveSocketHandler) {

    @Bean
    fun webSocketHandlerAdapter(): WebSocketHandlerAdapter {
                  private val sessionManager: WebSocketSessionManager,
                      private val return WebSocketHandlerAdapter(webSocketService())
    }socketHandlerForCounselor: SocketHandleForCounselor
) : WebSocketConfigurer {

    @Bean
    fun webSocketServicewebSocketHandler(): WebSocketService {
        return HandshakeWebSocketService(ReactorNettyRequestUpgradeStrategy())
    }

= MyWebSocketHandler(sessionManager)

    @Bean
override    fun webSocketHandlerMappingregisterWebSocketHandlers()registry: HandlerMappingWebSocketHandlerRegistry) {
        val map = mapOf(// Local WebSocket handler
        registry.addHandler(webSocketHandler, "/ws-reactive" to reactiveSocketHandler)
        val  handlerMapping = SimpleUrlHandlerMapping.addInterceptors(HttpSessionHandshakeInterceptor())
        handlerMapping.order = 1
   .setAllowedOrigins("*")

     handlerMapping.urlMap = map
 // Actor WebSocket handler
    return handlerMapping
    }
}

SessionManager

커넥션 관리를 하고 PUB/SUB및 SendToSession 이 있는 소켓을 관리하는 기본 정의객체

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.WebSocketSession

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap

@Component
class WebSocketSessionManager {
    private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)

    val sessions = ConcurrentHashMap<String, WebSocketSession>()

    val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()

    fun addSession(session: WebSocketSession) {registry.addHandler(actorWebSocketHandler, "/ws-actor")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler for User
        registry.addHandler(socketHandlerForPersnalRoom, "/ws-user")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler for Counselor
        registry.addHandler(socketHandlerForCounselor, "/ws-counselor")
            .addInterceptors(HttpSessionHandshakeInterceptor())
        sessions[session.id] = session
        logger.info("Connected: ${session.id}.setAllowedOrigins("*")
    }

    fun removeSession(session: WebSocketSession) {
        sessions.remove(session.id)
        logger.info("Disconnected: ${session.id}")
    }

    fun subscribeToTopic(sessionId: String, topic: String) }



Reactive

Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.config

import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.config.ResourceHandlerRegistry
import org.springframework.web.reactive.config.WebFluxConfigurer

@Configuration
@EnableWebFlux
class WebFluxConfig : WebFluxConfigurer {
    override fun addResourceHandlers(registry: ResourceHandlerRegistry) {
        topicSubscriptionsregistry.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
addResourceHandler("/**")
            logger.info("Session $sessionId subscribed to topic $topicaddResourceLocations("classpath:/static/")
    }
}

package org.example.kotlinbootreactivelabs.ws

import   fun unsubscribeFromTopic(sessionId: String, topic: String) {
        topicSubscriptions[topic]?.remove(sessionId)
        logger.info("Session $sessionId unsubscribed from topic $topic")
    }

    fun sendMessageToSession(sessionId: String, message: String) {
        sessions[sessionId]?.letorg.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.HandlerMapping
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy
import org.springframework.web.reactive.socket.server.WebSocketService
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService

@Configuration
@EnableWebFlux
class ReactiveWebSocketConfig(private val reactiveSocketHandler: ReactiveSocketHandler) {

    @Bean
    fun webSocketHandlerAdapter(): WebSocketHandlerAdapter {
            sendEventTextMessage(return WebSocketHandlerAdapter(webSocketService())
    }

    @Bean
    fun webSocketService():   it, EventTextMessage(WebSocketService {
        return HandshakeWebSocketService(ReactorNettyRequestUpgradeStrategy())
    }

       type = MessageType.PUSH,@Bean
    fun webSocketHandlerMapping(): HandlerMapping {
        val map = mapOf("/ws-reactive"  message = message,to reactiveSocketHandler)
        val handlerMapping = SimpleUrlHandlerMapping()
        handlerMapping.order from = MessageFrom.SYSTEM,1
        handlerMapping.urlMap = map
        return handlerMapping
 id = null,
                    jsondata = null,
                )
            )
        }
}
  • WebFluxConfigurer : static web 리소스를 이용할때 설정이 필요합니다. ( testclient.html )
  • ReactiveWebSocketConfig : reactive socket의 ws endpoint를 핸들러와 함께 설정합니다.

SessionManager

커넥션 관리를 기본적으로 하고 PUB/SUB및 SendToSession 기능이 있는 소켓을 관리하는 기본 객체로~

외부 MQ와 연결해 확장해 나갈수 있습니다. 

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.WebSocketSession

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap

@Component
class WebSocketSessionManager    }

    fun sendMessageToTopic(topic: String, message: String) {
        topicSubscriptions[topic]?.forEach { sessionId ->
            sessions[sessionId]?.let {
    private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)

    val sessions = ConcurrentHashMap<String, sendEventTextMessageWebSocketSession>()

    val topicSubscriptions               it, EventTextMessage(
= ConcurrentHashMap<String, MutableSet<String>>()

    fun addSession(session: WebSocketSession) {
        sessions[session.id] = session
       type = MessageType.PUSH, logger.info("Connected: ${session.id}")
    }

    fun removeSession(session: WebSocketSession) {
        sessions.remove(session.id)
     message = message,
   logger.info("Disconnected: ${session.id}")
    }

    fun subscribeToTopic(sessionId: String, topic: String) {
         from = MessageFrom.SYSTEM,topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
        logger.info("Session $sessionId subscribed to topic  $topic")
    }

    fun unsubscribeFromTopic(sessionId: String, idtopic: =String) null,{
        topicSubscriptions[topic]?.remove(sessionId)
        logger.info("Session $sessionId unsubscribed from topic $topic")
   jsondata = null, }

    fun sendMessageToSession(sessionId: String, message: String) {
           )sessions[sessionId]?.let {
            sendEventTextMessage(
    )
            it, }
EventTextMessage(
             }
        }
}

Reactive

Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.ws

import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap


@Component
class WebSocketSessionManager {

    private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)

    val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>()

    val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()


    fun addSession(session: ReactiveWebSocketSession) {
type = MessageType.PUSH,
                    message = message,
                    from = MessageFrom.SYSTEM,
                    reactiveSessions[session.id] = sessionnull,
        logger.info("[SessionManager] Connected: ${session.id}")

            jsondata = sendReactiveEventTextMessage(sessionnull,
 EventTextMessage(
             type = MessageType.INFO,)
            message)
   = "You are connected - ${session.id}",
    }

    fun sendMessageToTopic(topic: String, message: from = MessageFrom.SYSTEM,String) {
        topicSubscriptions[topic]?.forEach { sessionId ->
    id = null,
      sessions[sessionId]?.let {
     jsondata  = null,
        ))sendEventTextMessage(
    }

      fun removeSession(session: ReactiveWebSocketSession) {
       it, reactiveSessions.remove(session.id)EventTextMessage(
        logger.info("[SessionManager] Disconnected: ${session.id}")
    }


    fun subscribeReactiveToTopic(sessionId: String, topic: String) {      type = MessageType.PUSH,
        topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
             logger.info("Session $sessionId subscribed tomessage topic $topic")= message,
    }

       fun unsubscribeReactiveFromTopic(sessionId: String, topic: String) {
        topicSubscriptions[topic]?.remove(sessionId)from = MessageFrom.SYSTEM,
        logger.info("Session $sessionId unsubscribed from topic $topic")
    }

    fun sendReactiveMessageToSession(sessionId: String, message:id String)= {null,
        reactiveSessions[sessionId]?.let {
            sendReactiveEventTextMessage(
   jsondata = null,
           it, EventTextMessage(
        )
            type = MessageType.PUSH,
  )
            }
      message = message,}
    }
}


Reactive

Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.ws

import               from = MessageFrom.SYSTEM,
                    id = null,org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.web.reactive.socket.WebSocketSession as ReactiveWebSocketSession

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap


@Component
class WebSocketSessionManager {

    private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)

    val reactiveSessions = ConcurrentHashMap<String, ReactiveWebSocketSession>()

    val        jsondata topicSubscriptions = nullConcurrentHashMap<String, MutableSet<String>>()


    fun addSession(session: ReactiveWebSocketSession) {
         )reactiveSessions[session.id] = session
            )
logger.info("[SessionManager] Connected: ${session.id}")

        }
    }

sendReactiveEventTextMessage(session, EventTextMessage(
    fun sendReactiveMessageToTopic(topic: String, message: String) {
   type = MessageType.INFO,
   topicSubscriptions[topic]?.forEach { sessionId ->
      message = "You are connected  reactiveSessions[sessionId]?.let {- ${session.id}",
            from    sendReactiveEventTextMessage(= MessageFrom.SYSTEM,
            id = null,
      it, EventTextMessage(
     jsondata = null,
        ))
    }

    fun type = MessageType.PUSH,removeSession(session: ReactiveWebSocketSession) {
        reactiveSessions.remove(session.id)
        logger.info("[SessionManager] Disconnected: ${session.id}")
    }


  message = message,
     fun subscribeReactiveToTopic(sessionId: String, topic: String) {
        topicSubscriptions.computeIfAbsent(topic) {          from = MessageFrom.SYSTEM,mutableSetOf() }.add(sessionId)
        logger.info("Session $sessionId subscribed to topic $topic")
    }

    fun unsubscribeReactiveFromTopic(sessionId: String, idtopic: =String) null,{
        topicSubscriptions[topic]?.remove(sessionId)
        logger.info("Session $sessionId unsubscribed from topic $topic")
    jsondata = null,}

    fun sendReactiveMessageToSession(sessionId: String, message: String) {
        reactiveSessions[sessionId]?.let {
           ) sendReactiveEventTextMessage(
                it, )EventTextMessage(
            }
        }
type    }
}= MessageType.PUSH,
                    message = message,
                    from = MessageFrom.SYSTEM,
                    id = null,
                    jsondata = null,
                )
            )
        }
    }

    fun sendReactiveMessageToTopic(topic: String, message: String) {
        topicSubscriptions[topic]?.forEach { sessionId ->
            reactiveSessions[sessionId]?.let {
                sendReactiveEventTextMessage(
                    it, EventTextMessage(
                        type = MessageType.PUSH,
                        message = message,
                        from = MessageFrom.SYSTEM,
                        id = null,
                        jsondata = null,
                    )
                )
            }
        }
    }
}


Reactive 모드로 작성된 소켓   pub/sub/sentoSession 위한 TEST 툴도 준비되어있으며 전체코드도 포함되어있습니다.

Image Added