Reactive 모드의 웹소켓을 활용해 PubSub의 기능을 액터모델을 통해 구현하는것을 간략하게 알아보고
웹소켓은 브라우저와 양방향 통신이 가능한 특징이 있으며 , 웹소켓 핸들러를 이용해 액터모델에 연결할수 있으며, 액터모델이 가진 특징을 이용해 분산처리로 확장을 할수 있습니다.
CRUD 테스트인경우 블락킹 기법의 일반적 유닛테스트를 이용하면 되지만, 실시간 이벤트가 작동하는 모듈을 중단시키지 않고 블락없이 최종 수신 검증을 하는 방법을 알아보겠습니다.
Reactive Websocket with Actor 이벤트 흐름표
- WebClient : 브라우저에서 웹소켓 이벤트를 발생시킵니다.
- SocketHandler : 서버에서 웹소켓 이벤트를 수신받을수 있습니다. 수신받은 이벤트를 액터모델에 전달합니다.
- UserSessionActor : 웹소켓 세션을 액터모델을 통해 관리합니다.
웹소켓이라던지 핸들러라던지 개발용어가 가득합니다. 구현된 모델을 도메인 언어를 사용해 이벤트 스토밍으로 풀어보겠습니다.
이벤트 스토밍으로 풀어본 PUBSUB 과정
와이어 프레임만 그려 빈약한 기획문서에 시간을 대부분 사용하는 과정을 가속화하는 방법으로 DDD에 이벤트 스토밍으로 소개되고 있습니다.
데이터중심이 아닌, 이벤트 중심 설계 특히 액터모델에 도메인을 녹이고자할때 도입되는 기법으로 , 개인해석에 의한 간소화 버전으로 이용해 보았습니다.
구현된 코드의 작동에 대한 설명은 이러하고 리액티브 웹소켓+액터모델을 통해 PUBSUB 구현된 코드를 간략하게 먼저 살펴보고
유닛테스트를 수행해보겠습니다.
UserSessionManager Actor
UserSessionManaver Actor는 웹소켓의 세션을 관리하고 PubSub이벤트를 중계처리합니다.
SocketHandler
SocketHandler는 브라우가 사용하는 웹소켓의 이벤트가 서버에 진입되는 지점으로 Reactive웹소켓 모드가 이용되었습니다.
도메인 로직처리를 가지지 않으며 액터모델에 전달하는 역할만 합니다.
이 전략은 추후 웹소켓은 StateLess로 작동해 LB전략에의해 분산할수 있으며, 액터모델은 StateFul을 담당해 AkkaCluster를 이용해 분산처리 설계를 할수 있습니다.
비교적 구현난이도가 쉬운 Pub/Sub 브로드 캐스트만(ex> redis only) 사용해 시작하는경우 단일지점 병목 또는 도메인이 복잡할때 구현코드의 복잡성이 늘어날수 있는것을 보완할수 있습니다.
유닛테스트 코드
@Test
fun testSendMessageToSession() {
val probe = testKit.createTestProbe<UserSessionResponse>()
val session = Mockito.mock(WebSocketSession::class.java)
val textMessage = Mockito.mock(WebSocketMessage::class.java)
Mockito.`when`(session.id).thenReturn("session1")
Mockito.`when`(session.textMessage(Mockito.anyString())).thenReturn(textMessage)
Mockito.`when`(session.send(Mockito.any())).thenReturn(Mono.empty())
val actor = testKit.spawn(UserSessionManagerActor.create())
actor.tell(AddSession(session))
actor.tell(SendMessageToSession("session1", "Hello"))
// Test for DelayedMessage
actor.tell(Ping(probe.ref()))
probe.expectMessageClass(Pong::class.java)
// Create an actual instance of EventTextMessage
val expectedMessage = EventTextMessage(
type = MessageType.SESSIONID,
message = "Connected",
from = MessageFrom.SYSTEM,
id = "session1",
jsondata = null
)
// Verify that textMessage was called with the correct JSON value
val objectMapper = jacksonObjectMapper()
val expectedJson = objectMapper.writeValueAsString(expectedMessage)
Mockito.verify(session).textMessage(Mockito.eq(expectedJson))
}



