Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
themeEmacs
package org.example.kotlinbootreactivelabs.ws


import org.example.kotlinbootreactivelabs.ws.model.EventTextMessage
import org.example.kotlinbootreactivelabs.ws.model.MessageFrom
import org.example.kotlinbootreactivelabs.ws.model.MessageType
import org.example.kotlinbootreactivelabs.ws.model.sendReactiveEventTextMessage
import org.springframework.stereotype.Component
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
import reactor.core.publisher.Mono

@Component
class ReactiveSocketHandler(private val sessionManager: WebSocketSessionManager) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        sessionManager.addSession(session)

        return session.receive()
            .map { it.payloadAsText }
            .flatMap { payload ->
                when {
                    payload.startsWith("subscribe:") -> {
                        val topic = payload.substringAfter("subscribe:")
                        sessionManager.subscribeReactiveToTopic(session.id, topic)
                        Mono.empty<Void>()
                    }
                    payload.startsWith("unsubscribe:") -> {
                        val topic = payload.substringAfter("unsubscribe:")
                        sessionManager.unsubscribeReactiveFromTopic(session.id, topic)
                        Mono.empty<Void>()
                    }
                    else -> {
                        sendReactiveEventTextMessage(
                            session, EventTextMessage(
                                type = MessageType.CHAT,
                                message = "Echo: $payload",
                                from = MessageFrom.SYSTEM,
                                id = null,
                                jsondata = null,
                            )
                        )
                        Mono.empty<Void>()
                    }
                }
            }
            .then()
            .doFinally { sessionManager.removeSession(session) }
    }
}



전송방법

MVC

Code Block
themeEmacs
fun sendEventTextMessage(session: WebSocketSession, eventTextMessage: EventTextMessage) {
    val objectMapper = jacksonObjectMapper()
    val jsonPayload = objectMapper.writeValueAsString(eventTextMessage)
    session.sendMessage(TextMessage(jsonPayload))
}


REACTIVE

Code Block
themeEmacs
fun sendReactiveEventTextMessage(session: ReactiveWebSocketSession, eventTextMessage: EventTextMessage) {
    val objectMapper = jacksonObjectMapper()
    val jsonPayload = objectMapper.writeValueAsString(eventTextMessage)
    val message = session.textMessage(jsonPayload)
    session.send(Mono.just(message)).subscribe()
}


WS EndPoint Config설정

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.SocketHandleForCounselor
import com.example.kotlinbootlabs.ws.handler.auth.SocketHandlerForPersnalRoom
import com.example.kotlinbootlabs.ws.handler.basic.SocketHandlerWithActor
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.socket.config.annotation.EnableWebSocket
import org.springframework.web.socket.config.annotation.WebSocketConfigurer
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor

@Configuration
@EnableWebSocket
class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler,
                      private val actorWebSocketHandler: SocketHandlerWithActor,
                      private var socketHandlerForPersnalRoom: SocketHandlerForPersnalRoom,
                      private val sessionManager: WebSocketSessionManager,
                      private val socketHandlerForCounselor: SocketHandleForCounselor
) : WebSocketConfigurer {

    @Bean
    fun webSocketHandler() = MyWebSocketHandler(sessionManager)

    override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {
        // Local WebSocket handler
        registry.addHandler(webSocketHandler, "/ws")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler
        registry.addHandler(actorWebSocketHandler, "/ws-actor")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler for User
        registry.addHandler(socketHandlerForPersnalRoom, "/ws-user")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")

        // Actor WebSocket handler for Counselor
        registry.addHandler(socketHandlerForCounselor, "/ws-counselor")
            .addInterceptors(HttpSessionHandshakeInterceptor())
            .setAllowedOrigins("*")
    }
}


...

커넥션 관리를 하고 PUB/SUB및 SendToSession 이 있는 소켓을 관리하는 기본 정의객체

MVC

Code Block
themeEmacs
package com.example.kotlinbootlabs.ws

import com.example.kotlinbootlabs.ws.handler.auth.EventTextMessage
import com.example.kotlinbootlabs.ws.handler.auth.MessageFrom
import com.example.kotlinbootlabs.ws.handler.auth.MessageType
import com.example.kotlinbootlabs.ws.handler.auth.sendEventTextMessage
import org.springframework.web.socket.WebSocketSession

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.util.concurrent.ConcurrentHashMap

@Component
class WebSocketSessionManager {
    private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java)

    val sessions = ConcurrentHashMap<String, WebSocketSession>()

    val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>()

    fun addSession(session: WebSocketSession) {
        sessions[session.id] = session
        logger.info("Connected: ${session.id}")
    }

    fun removeSession(session: WebSocketSession) {
        sessions.remove(session.id)
        logger.info("Disconnected: ${session.id}")
    }

    fun subscribeToTopic(sessionId: String, topic: String) {
        topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId)
        logger.info("Session $sessionId subscribed to topic $topic")
    }

    fun unsubscribeFromTopic(sessionId: String, topic: String) {
        topicSubscriptions[topic]?.remove(sessionId)
        logger.info("Session $sessionId unsubscribed from topic $topic")
    }

    fun sendMessageToSession(sessionId: String, message: String) {
        sessions[sessionId]?.let {
            sendEventTextMessage(
                it, EventTextMessage(
                    type = MessageType.PUSH,
                    message = message,
                    from = MessageFrom.SYSTEM,
                    id = null,
                    jsondata = null,
                )
            )
        }
    }

    fun sendMessageToTopic(topic: String, message: String) {
        topicSubscriptions[topic]?.forEach { sessionId ->
            sessions[sessionId]?.let {
                sendEventTextMessage(
                    it, EventTextMessage(
                        type = MessageType.PUSH,
                        message = message,
                        from = MessageFrom.SYSTEM,
                        id = null,
                        jsondata = null,
                    )
                )
            }
        }
    }
}

...