Reactive 모드의 웹소켓을 활용해 PubSub 기능을 액터모델을 통해 연결 하는것을 간략하게 알아보고

실시간 이벤트가 작동하는 모듈을 중단시키지 않고 블락없이 최종 수신 검증을 하는 방법을 알아보겠습니다.

Reactive Websocket with Actor 이벤트 흐름표

  • WebClient : 브라우저에서 웹소켓 이벤트를 발생시킵니다.
  • SocketHandler : 서버에서 웹소켓 이벤트를 수신받을수 있습니다. 수신받은 이벤트를 액터모델에 전달합니다.
  • UserSessionActor : 웹소켓 세션을 액터모델을 통해 관리합니다.


웹소켓이라던지 핸들러라던지 개발용어가 가득합니다.  구현된 모델을 도메인 언어를 사용해 이벤트 스토밍으로 풀어보겠습니다.


이벤트 스토밍으로 풀어본 PUBSUB 과정

와이어 프레임만 그려 빈약한 기획문서 작성에 대부분 시간을 사용하는 과정을 가속화하는 방법으로 DDD에이벤트 스토밍편에서 소개되는 내용으로

데이터중심이 아닌, 이벤트 중심 설계 특히 액터모델에 도메인을 녹이고자할때 도입되는 기법으로 , 개인해석에 의한 간소화 버전으로 이용해 보았습니다.


접속
토픽구독
토픽발행
접속종료
클라이언트
클라이언트
서버
접속을
시도한다
특정토픽을
구독한다
특정토픽을
발행한다
토픽수신
클라이언트
수신받은
토픽을
표현한다.
서버
자원을
정리한다.
사용자기준 접속 세션은 마지막 세션을 유효세션으로만 인정해 단일 로그인만 지원한다.

VS

멀티세션을 모두 인정해 N스크린에 이용할수 있게한다.
한 사용자는 N개 이내의 토픽만 구독할수 있다.

특정 토픽발생시 구독한 사용자에게 모두 전달되어야한다.

-구독한 토픽을 수신할수 있어야한다.
-구독한 메시지는 팝업형태로 메시지가 표현된다.

-명시적 종료일시 세션을 정리하고 토픽을 구독 해제한다.
-비정상 종요일시 세션만 정리한다. 

이 방식의 디테일을 연구하고 싶으면 다음 링크 추가참고 



구현된 코드의 작동에 대한 설명은 이러하고 리액티브 웹소켓+액터모델을 통해 PUBSUB 구현된 이벤트 처리기 인터페이를

간략하게 먼저 살펴보고 RealTime으로 작동중인 로직코드의 유닛테스트 수행해보겠습니다.

UserSessionManager Actor

전체 코드 : https://github.com/psmon/kopring-reactive-labs/blob/main/KotlinBootReactiveLabs/src/main/kotlin/org/example/kotlinbootreactivelabs/ws/actor/chat/CounselorManagerActor.kt

UserSessionManaver Actor는 웹소켓의 세션을 관리하고 PubSub이벤트를 중계처리합니다.


SocketHandler

전체코드 : https://github.com/psmon/kopring-reactive-labs/blob/main/KotlinBootReactiveLabs/src/main/kotlin/org/example/kotlinbootreactivelabs/ws/base/SocketHandler.kt

SocketHandler는 브라우가 사용하는 웹소켓의 이벤트가 서버에 진입되는 지점으로 Reactive웹소켓 모드가 이용되었습니다.

도메인 로직처리를 가지지 않으며  액터모델에 전달하는 역할만 합니다. 


이 전략은 추후 웹소켓은 StateLess로 작동해 LB전략에의해 분산할수 있으며, 액터모델은 StateFul을 담당해 AkkaCluster를 이용해 분산처리 설계를 할수 있습니다. 

비교적 구현난이도가 쉬운 Pub/Sub 브로드 캐스트만(ex> redis only) 사용해 시작하는경우 단일지점 병목 또는 도메인이 복잡할때 구현코드의 복잡성이 늘어날수 있는것을 보완할수 있습니다.


유닛테스트 코드

    @Test
    fun testSendMessageToSession() {
        val probe = testKit.createTestProbe<UserSessionResponse>()
        val session = Mockito.mock(WebSocketSession::class.java)
        val textMessage = Mockito.mock(WebSocketMessage::class.java)

        Mockito.`when`(session.id).thenReturn("session1")
        Mockito.`when`(session.textMessage(Mockito.anyString())).thenReturn(textMessage)
        Mockito.`when`(session.send(Mockito.any())).thenReturn(Mono.empty())

        val actor = testKit.spawn(UserSessionManagerActor.create())

        actor.tell(AddSession(session))
        actor.tell(SendMessageToSession("session1", "Hello"))

        // Test for DelayedMessage
        actor.tell(Ping(probe.ref()))
        probe.expectMessageClass(Pong::class.java)

        // Create an actual instance of EventTextMessage
        val expectedMessage = EventTextMessage(
            type = MessageType.SESSIONID,
            message = "Connected",
            from = MessageFrom.SYSTEM,
            id = "session1",
            jsondata = null
        )

        // Verify that textMessage was called with the correct JSON value
        val objectMapper = jacksonObjectMapper()
        val expectedJson = objectMapper.writeValueAsString(expectedMessage)
        Mockito.verify(session).textMessage(Mockito.eq(expectedJson))
    }

전체 코드 :https://github.com/psmon/kopring-reactive-labs/blob/main/KotlinBootReactiveLabs/src/test/kotlin/org/example/kotlinbootreactivelabs/ws/actor/chat/UserSessionManagerActorTest.kt


유닛테스트 코드 분석

1. 테스트 대상 기능

테스트하려는 핵심 기능은 다음과 같습니다:

  1. WebSocket 세션을 액터에 등록 (AddSession)

  2. 등록된 세션에 메시지를 보냄 (SendMessageToSession)

  3. Ping 메시지를 보내 상태 확인 (응답으로 Pong을 기대)

  4. 메시지가 WebSocket 세션을 통해 정확하게 전송되었는지 검증


🧩 코드 구성 요소 분석

🎯 val probe = testKit.createTestProbe<UserSessionResponse>()

  • 액터에서 보낸 응답 메시지를 기다리기 위한 테스트 프로브입니다.

  • 실제 메시지를 기다릴 수 있어서 비동기 액터 테스트에 적합합니다.

🧪 session, textMessage Mocking

val session = Mockito.mock(WebSocketSession::class.java)
val textMessage = Mockito.mock(WebSocketMessage::class.java)
  • 실제 네트워크/WebSocket 동작 없이 가짜 세션을 만들어 테스트합니다.

  • Mockito.when(...).thenReturn(...)으로 동작 정의.


🧠 actor.tell(...) 시나리오 실행

actor.tell(AddSession(session))
actor.tell(SendMessageToSession("session1", "Hello"))
  • WebSocket 세션을 액터에 등록하고,

  • 메시지를 특정 세션 ID로 전송하도록 명령.


⏱️ Ping - 상태 확인용 메시지

actor.tell(Ping(probe.ref()))
probe.expectMessageClass(Pong::class.java)
  • 액터가 내부적으로 메시지 처리를 마쳤는지 확인하는 수단.

  • 비동기 코드에서 완료 시점 확인을 위한 좋은 패턴.

✅ 메시지 직렬화와 검증

val expectedJson = objectMapper.writeValueAsString(expectedMessage)
Mockito.verify(session).textMessage(Mockito.eq(expectedJson))
  • EventTextMessage가 올바르게 JSON으로 변환되어 WebSocket으로 전송되었는지 검증.

  • 실질적인 출력값을 비교함으로써 테스트의 신뢰도 향상.


🌟 장점 요약

항목장점
Mock 기반 테스트실제 네트워크 없이 WebSocket 동작을 검증할 수 있어 빠르고 안정적
비동기 메시지 처리 검증testProbe + Ping/Pong 패턴으로 액터의 처리 완료 타이밍을 명확하게 파악
직렬화 검증 포함객체가 예상한 JSON 형식으로 변환되어 메시지 전송되는지까지 검증
액터 기반 테스트 구조메시지 기반 설계가 잘 반영되어 있고, 액터의 상태 관리가 유연하게 테스트 가능
단일 책임 테스트세션 등록 → 메시지 전송 → 검증까지 각 흐름이 분리되어 명확

💡 개선 포인트 (보너스)

  • verify(session).send(...)도 함께 확인하면 실제 메시지 전송까지 테스트 가능

  • 예외 상황 (예: 존재하지 않는 세션에 메시지 보낼 때)도 추가하면 완전한 커버리지 확보


액터모델로 구현이 분리된경우 그대로 이용해 API화를 할수 있으며  실시간 통신중인 1:1 연결수립된후~ API를 이용해 중간관리 계층이 개입할수도 있습니다.


관련 코드 :

해당기능을 이용할수 있는 API도 작성되어 이용할수 있습니다.



  • No labels