코틀린에서 웹소켓에 액터모델을 연결시켜 분산 상태처리가 가능한 웹소켓 모델로 업그레이드 시도를 해보겠습니다.
구현하고자 하는 웹소켓 이벤트 Flow
액터모델에 웹소켓을 연결하기전 먼저 웹소켓에대한 기본을 알아보겠습니다.
웹소켓 개발을 처음시작할때 기본작동 단계처리를 건너띄고 복잡한 도메인 모델을 먼저 연결시도하려다 성능적으로 실패하는 케이스를 많이 봐왔습니다.
웹소켓의 연결지향 프로토콜에 대한 설명은 생략하고 작성해야할 기본 요소를 구현해보겠습니다.
우선 사용자에게 발생한 이벤트를 우아하게 처리하기 위해 이벤트의 흐름과 처리를 어느 부분에 해야할지 고민해야합니다.
여기서는 크게 3가지 Part로 구분하였으며
다음과 같은 논리적 역할을 먼저 부여합니다.
- WebSocket Client : 브라우저에서 접속을 하는 클라이언트 객체입니다. EndUser 사용자로 브라우저에 존재합니다.
- WebSocket Handler : EndUser 사용자로 부터 Connect/Disconnect/Send 등의 이벤트를 받아 처리하는 서버쪽 객체입니다.
- SessionManager : WebSocket Handler에 접속된 클라이언트 세션을 관리하는 서버모듈로, 접속한 전체 세션을 관리합니다.
WebSocket은 양방향가능 으로 웹환경에서 RealTime메시지를 처리하기 적합하며 이러한 장점을 이용해 RestFul에서 하기어려운 동작을 할수 있습니다.
다음과 같이 3가지 기능에 초첨을 두고 기본 SpringBOOT Kotlin에서 사용가능한 소켓모델을 만들어 보겠습니다.
- 세션관리 : 서버에서는 접속한 사용자의 웹소켓세션을 관리해야하며 Connect/DisConnect를 잘 관리해야합니다.
- PUB/SUB : 접속관리는 시스템적인 부분이며 메시징처리를 우아하게 보내려면 메시지 그룹을 논리적으로 처리하기위해 구독모델(PUB/SUB)을 기본으로 지원해야합니다.
- 구독은 클라이언트가 능동적으로 할수있으며, 서버는 API를 통해 특정 구독자를 타켓팅해 보낼수 있습니다.
- 서버 TO 세션 : 서버는 세션 ID를 알고 있다고하면 특정 사용자 한명을 Pick해 메시지를 보낼수있어야합니다.
- 이 장치가 없으면 대부분 특정 사용자에게만 보낼 메시지를 브로드캐스트 또는 특정 토픽에 모두 보낸후 클라이언트 또는 특정지점에서 필터를 하게됩니다. 이것은 심각한 보안위협이자 나중에는 트래픽 성능 이슈가 될수 있습니다.
WebSocketSessionManager
@Component class WebSocketSessionManager { private val logger = LoggerFactory.getLogger(WebSocketSessionManager::class.java) val sessions = ConcurrentHashMap<String, WebSocketSession>() val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() fun addSession(session: WebSocketSession) { sessions[session.id] = session logger.info("Connected: ${session.id}") } fun removeSession(session: WebSocketSession) { sessions.remove(session.id) logger.info("Disconnected: ${session.id}") } fun subscribeToTopic(sessionId: String, topic: String) { topicSubscriptions.computeIfAbsent(topic) { mutableSetOf() }.add(sessionId) logger.info("Session $sessionId subscribed to topic $topic") } fun unsubscribeFromTopic(sessionId: String, topic: String) { topicSubscriptions[topic]?.remove(sessionId) logger.info("Session $sessionId unsubscribed from topic $topic") } fun sendMessageToSession(sessionId: String, message: String) { sessions[sessionId]?.sendMessage(TextMessage(message)) } fun sendMessageToTopic(topic: String, message: String) { topicSubscriptions[topic]?.forEach { sessionId -> sessions[sessionId]?.sendMessage(TextMessage(message)) } } }
해당 클래스는 WebSocketSession을 관리하며 추가로 구독모델을 지원하게되며~ 싱글톤으로 클라이언트 세션을 관리할 관리자가 존재하게 됩니다.
MyWebSocketHandler
@Component class MyWebSocketHandler(private val sessionManager: WebSocketSessionManager) : TextWebSocketHandler() { override fun afterConnectionEstablished(session: WebSocketSession) { sessionManager.addSession(session) } override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) { sessionManager.removeSession(session) } override fun handleTextMessage(session: WebSocketSession, message: TextMessage) { val payload = message.payload when { payload.startsWith("subscribe:") -> { val topic = payload.substringAfter("subscribe:") sessionManager.subscribeToTopic(session.id, topic) } payload.startsWith("unsubscribe:") -> { val topic = payload.substringAfter("unsubscribe:") sessionManager.unsubscribeFromTopic(session.id, topic) } else -> { session.sendMessage(TextMessage("Echo: $payload")) } } } }
클라이언트 웹소켓을 처리하는 핵심 핸들러이며 , 클라이언트로부터 받은 메시지의 수신부를 처리하게 됩니다.
클라이언트 접속당 1개씩 생기는 핸들러로 접속관리및 구독처리부분은, 처음에 구현한 세션관리자에게 위임하였습니다.
WebSocketConfig
@Configuration @EnableWebSocket class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler, private val sessionManager: WebSocketSessionManager ) : WebSocketConfigurer { @Bean fun webSocketHandler() = MyWebSocketHandler(sessionManager) override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) { // Local WebSocket handler registry.addHandler(webSocketHandler, "/ws") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") } }
Spring Boot 웹서버에서 클라이언트가 연결할 엔드포인트를 생성해 연결지점을 만들어 주며 우리가 만든 핸들러를 연결시킵니다.
API Controller
@RestController class WebSocketController(private val sessionManager: WebSocketSessionManager, private val sessionManagerActor: ActorRef<WebSocketSessionManagerCommand> ) { @PostMapping("/send-to-session") fun sendMessageToSession(@RequestParam sessionId: String, @RequestBody message: String): String { sessionManager.sendMessageToSession(sessionId, message) return "Message sent to session $sessionId" } @PostMapping("/send-to-topic") fun sendMessageToTopic(@RequestParam topic: String, @RequestBody message: String): String { sessionManager.sendMessageToTopic(topic, message) return "Message sent to topic $topic" } }
구독한 사용자에게 메시지 보내기, 특정 세션에 메시지 보내기등 API 기능으로 , 서버에서 발생시킬수 있는 기능을 기능화할수 있습니다.
WebSocket Test Front
웹소켓을 테스트할수 있는 다양한 툴들이 있을테지만~ 브라우저에서 작동하는 간단한 테스트 페이지를 생성해
우리가 작성한 서버를 테스트 할수 있습니다. 서버에서 임의 발생시키는 이벤트는 API화 하여 Swagger를 함께 운영해
클라이언 ↔ 서버 ↔ 클라이언트 , 발생 이벤트를 테스트할수 있습니다.
소스 : https://github.com/psmon/java-labs/blob/master/KotlinBootLabs/src/main/resources/static/wstest.html
여기까지가 웹소켓을 활용해야하기 위해 기본이 되는 모델이며 , 우리가 여기서 구현해야할 도메인모델은 사실상 더 복잡하며
이 모델만으로는 복잡한 도메인을 단순하게 풀기어려우며 분산처리가 안되는 구조로 성능의 한계에 금방 도달하게 됩니다.
이 소켓모델을 액터모델로 변신하는 코드를 먼저 살펴보고 이 방식을 채택했을때 얻을수 있는 이점을 살펴보겠습니다.
액터모델로 연결
액터모델에 대한 설명은 이전 장(HelloActor)에 있으며~ Socket의 Session을 관리했던 클래스를 액터모델로 변환을 시켜보겠습니다.
WebSocketSessionManagerActor
sealed class WebSocketSessionManagerCommand sealed class WebSocketSessionManagerResponse data class AddSession(val session: WebSocketSession) : WebSocketSessionManagerCommand() data class RemoveSession(val session: WebSocketSession) : WebSocketSessionManagerCommand() data class SubscribeToTopic(val sessionId: String, val topic: String) : WebSocketSessionManagerCommand() data class UnsubscribeFromTopic(val sessionId: String, val topic: String) : WebSocketSessionManagerCommand() data class SendMessageToSession(val sessionId: String, val message: String) : WebSocketSessionManagerCommand() data class SendMessageToTopic(val topic: String, val message: String) : WebSocketSessionManagerCommand() data class GetSessions(val replyTo: ActorRef<WebSocketSessionManagerResponse>) : WebSocketSessionManagerCommand() data class SessionsResponse(val sessions: Map<String, WebSocketSession>) : WebSocketSessionManagerResponse() data class Ping(val replyTo: ActorRef<WebSocketSessionManagerResponse>) : WebSocketSessionManagerCommand() data class Pong(val message: String) : WebSocketSessionManagerResponse() class WebSocketSessionManagerActor private constructor( context: ActorContext<WebSocketSessionManagerCommand> ) : AbstractBehavior<WebSocketSessionManagerCommand>(context) { companion object { fun create(): Behavior<WebSocketSessionManagerCommand> { return Behaviors.setup { context -> WebSocketSessionManagerActor(context) } } } private val logger = LoggerFactory.getLogger(WebSocketSessionManagerActor::class.java) private val sessions = ConcurrentHashMap<String, WebSocketSession>() private val topicSubscriptions = ConcurrentHashMap<String, MutableSet<String>>() override fun createReceive(): Receive<WebSocketSessionManagerCommand> { return newReceiveBuilder() .onMessage(AddSession::class.java, this::onAddSession) .onMessage(RemoveSession::class.java, this::onRemoveSession) .onMessage(SubscribeToTopic::class.java, this::onSubscribeToTopic) .onMessage(UnsubscribeFromTopic::class.java, this::onUnsubscribeFromTopic) .onMessage(SendMessageToSession::class.java, this::onSendMessageToSession) .onMessage(SendMessageToTopic::class.java, this::onSendMessageToTopic) .onMessage(GetSessions::class.java, this::onGetSessions) .onMessage(Ping::class.java, this::onPing) .build() } private fun onPing(command: Ping): Behavior<WebSocketSessionManagerCommand> { command.replyTo.tell(Pong("Pong")) return this } private fun onGetSessions(command: GetSessions): Behavior<WebSocketSessionManagerCommand> { command.replyTo.tell(SessionsResponse(sessions.toMap())) return this } private fun onAddSession(command: AddSession): Behavior<WebSocketSessionManagerCommand> { sessions[command.session.id] = command.session logger.info("Connected: ${command.session.id}") return this } private fun onRemoveSession(command: RemoveSession): Behavior<WebSocketSessionManagerCommand> { sessions.remove(command.session.id) logger.info("Disconnected: ${command.session.id}") return this } private fun onSubscribeToTopic(command: SubscribeToTopic): Behavior<WebSocketSessionManagerCommand> { topicSubscriptions.computeIfAbsent(command.topic) { mutableSetOf() }.add(command.sessionId) logger.info("Session ${command.sessionId} subscribed to topic ${command.topic}") return this } private fun onUnsubscribeFromTopic(command: UnsubscribeFromTopic): Behavior<WebSocketSessionManagerCommand> { topicSubscriptions[command.topic]?.remove(command.sessionId) logger.info("Session ${command.sessionId} unsubscribed from topic ${command.topic}") return this } private fun onSendMessageToSession(command: SendMessageToSession): Behavior<WebSocketSessionManagerCommand> { sessions[command.sessionId]?.sendMessage(TextMessage(command.message)) return this } private fun onSendMessageToTopic(command: SendMessageToTopic): Behavior<WebSocketSessionManagerCommand> { topicSubscriptions[command.topic]?.forEach { sessionId -> sessions[sessionId]?.sendMessage(TextMessage(command.message)) } return this } }
- 기존 클래스를 메일박스를 이용한 이벤트 처리방식인 액터모델로 변경이 되었습니다.
ActorWebSocketHandler
class ActorWebSocketHandler(private val sessionManagerActor: ActorRef<WebSocketSessionManagerCommand>) : TextWebSocketHandler() { override fun afterConnectionEstablished(session: WebSocketSession) { sessionManagerActor.tell(AddSession(session)) } override fun afterConnectionClosed(session: WebSocketSession, status: org.springframework.web.socket.CloseStatus) { sessionManagerActor.tell(RemoveSession(session)) } override fun handleTextMessage(session: WebSocketSession, message: TextMessage) { val payload = message.payload when { payload.startsWith("subscribe:") -> { val topic = payload.substringAfter("subscribe:") sessionManagerActor.tell(SubscribeToTopic(session.id, topic)) } payload.startsWith("unsubscribe:") -> { val topic = payload.substringAfter("unsubscribe:") sessionManagerActor.tell(UnsubscribeFromTopic(session.id, topic)) } else -> { session.sendMessage(TextMessage("Echo: $payload")) } } } }
- 웹소켓의 핸들러부분이며 , 세션관리포함 구독기능을 액터모델인 SessionManagerActor에게 위임을 하였습니다.
- 꼭 액터모델이 아니더라도~ 더 좋은 장치가 있다고하면 ( Redis or RabbitMQ or Other MQ)로 연결할때도 Handeler 처리부분을 설계할수 있습니다.
AkkaConfiguration
@Configuration class AkkaConfiguration { private lateinit var actorSystem: ActorSystem<WebSocketSessionManagerCommand> @PostConstruct fun init() { actorSystem = ActorSystem.create(WebSocketSessionManagerActor.create(), "WebSocketSystem") } @PreDestroy fun shutdown() { actorSystem.terminate() } @Bean fun sessionManagerActor(): ActorRef<WebSocketSessionManagerCommand> { return actorSystem } }
- 액터시스템을 시작하고 , WebSocketSessionManagerCommand를 처리할수 있는 액터를 생성해, 액터참조를 자동주입 사용할수 있게 구현되었습니다.
- 참고 : 액터시스템은 더 상위의 관리자 액터를 루트로 두고 하위모델을 구조화해 배치할수 있습니다. 관리해야할 액터모델이 여러개인경우 Stage를 관리할 SuperVisor급 액터를 생성하는것이 권장됩니다.
- 스테이지는 무대이고 액터는 배우이고 다양한 액터모델을 상호작용가능한 논리적 구성을 하고 무대에 잘 배치를 시키고 배우에게 액션을 보내 시나리오를 짤수가 있는것이 AKKA 액터모델의 특징입니다.
추가되는 WebSocketConfig
class WebSocketConfig(private val webSocketHandler: MyWebSocketHandler, private val actorWebSocketHandler: ActorWebSocketHandler, private val sessionManager: WebSocketSessionManager ) : WebSocketConfigurer { @Bean fun webSocketHandler() = MyWebSocketHandler(sessionManager) override fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) { // Local WebSocket handler registry.addHandler(webSocketHandler, "/ws") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") // Actor WebSocket handler registry.addHandler(actorWebSocketHandler, "/ws-actor") .addInterceptors(HttpSessionHandshakeInterceptor()) .setAllowedOrigins("*") } }
- 웹소켓 EndPoint는 다수개 운영할수 있으며 로컬에서만 작동하는 기본 소켓모델과 , 액터모델과 상호운영하는 소켓모델 동시에 작동시켜볼수 있습니다.
WebSocketSessionManagerActorTest
class WebSocketSessionManagerActorTest { companion object { private lateinit var testKit: ActorTestKit @BeforeAll @JvmStatic fun setup() { testKit = ActorTestKit.create() } @AfterAll @JvmStatic fun teardown() { testKit.shutdownTestKit() } } @Test fun testAddAndRemoveSession() { val session = Mockito.mock(WebSocketSession::class.java) Mockito.`when`(session.id).thenReturn("session1") val actor = testKit.spawn(WebSocketSessionManagerActor.create()) actor.tell(AddSession(session)) // Verify session added (you can add more detailed checks if needed) actor.tell(RemoveSession(session)) // Verify session removed (you can add more detailed checks if needed) } @Test fun testSubscribeAndUnsubscribeToTopic() { val session = Mockito.mock(WebSocketSession::class.java) Mockito.`when`(session.id).thenReturn("session1") val actor = testKit.spawn(WebSocketSessionManagerActor.create()) actor.tell(AddSession(session)) actor.tell(SubscribeToTopic("session1", "topic1")) // Verify subscription (you can add more detailed checks if needed) actor.tell(UnsubscribeFromTopic("session1", "topic1")) // Verify unsubscription (you can add more detailed checks if needed) } @Test fun testSendMessageToSession() { val probe = testKit.createTestProbe<WebSocketSessionManagerResponse>() val session = Mockito.mock(WebSocketSession::class.java) Mockito.`when`(session.id).thenReturn("session1") val actor = testKit.spawn(WebSocketSessionManagerActor.create()) actor.tell(AddSession(session)) actor.tell(SendMessageToSession("session1", "Hello")) // Test for DelayedMessage actor.tell(Ping(probe.ref())) probe.expectMessageClass(Pong::class.java) Mockito.verify(session).sendMessage(TextMessage("Hello")) } @Test fun testSendMessageToTopic() { val probe = testKit.createTestProbe<WebSocketSessionManagerResponse>() val session1 = Mockito.mock(WebSocketSession::class.java) val session2 = Mockito.mock(WebSocketSession::class.java) Mockito.`when`(session1.id).thenReturn("session1") Mockito.`when`(session2.id).thenReturn("session2") val actor = testKit.spawn(WebSocketSessionManagerActor.create()) actor.tell(AddSession(session1)) actor.tell(AddSession(session2)) actor.tell(SubscribeToTopic("session1", "topic1")) actor.tell(SubscribeToTopic("session2", "topic1")) actor.tell(SendMessageToTopic("topic1", "Hello Topic")) actor.tell(Ping(probe.ref())) probe.expectMessageClass(Pong::class.java) Mockito.verify(session1).sendMessage(TextMessage("Hello Topic")) Mockito.verify(session2).sendMessage(TextMessage("Hello Topic")) } @Test fun testGetSessions() { val session1 = Mockito.mock(WebSocketSession::class.java) val session2 = Mockito.mock(WebSocketSession::class.java) Mockito.`when`(session1.id).thenReturn("session1") Mockito.`when`(session2.id).thenReturn("session2") val actor = testKit.spawn(WebSocketSessionManagerActor.create()) val probe = testKit.createTestProbe<WebSocketSessionManagerResponse>() actor.tell(AddSession(session1)) actor.tell(AddSession(session2)) actor.tell(GetSessions(probe.ref())) val response = probe.expectMessageClass(SessionsResponse::class.java) assertEquals(2, response.sessions.size) assertTrue(response.sessions.containsKey("session1")) assertTrue(response.sessions.containsKey("session2")) } @Test fun testPingPong() { val actor = testKit.spawn(WebSocketSessionManagerActor.create()) val probe = testKit.createTestProbe<WebSocketSessionManagerResponse>() actor.tell(Ping(probe.ref())) val response = probe.expectMessageClass(Pong::class.java) assertEquals("Pong", response.message) } }
- 실시간으로 작동하는 웹소켓의 유닛테스트를 만들기는 어려울수 있습니다. 여기서는 핵심 모델을 액터로 분리해 기능테스트를 수행하고 소켓세션을 Mock로 생성해 유닛테스트에 이점이 있는 코드를 생성할수 있습니다.
- 유닛테스트가 가능하다란점은 코드를 통해 성능테스트 또한 진행될수 있음을 의미하게 됩니다.
여기까지가 쓸만한 기능을 가진 웹소켓모듈을 액터모델에 연동해보았으며~ 액터모델에 연결되고 나면 액터모델이 가진 다양한 메시지패턴들을 활용해 상호운영할수 있게됩니다.
웹소켓 모듈을 액터모델과 상호운영했을때 장점을 추가로 살펴보겠습니다.
Socket Session성능분리
웹소켓 노드 1대가 만명을 받아줄수 있는 구조라고 가정해보고 이 노드에 DB를 호출한다던지 단일지점 병목이 있는
기능을 계속 만들어내게되면 이 서버가 처리할량이 점점 느려지게 되며 스케일아웃을 할수록 단일지점 병목을 만들어내 시스템을 중단하는 상황을 만들수 있습니다.
웹소켓처리 서버는 사용자를 받아줄수 있는 진입점의 노드이기때문에 이 노드의 성능 관심사는 엔드유저 사용자의 트래픽을 얼마나 잘 받아줄수 있는가에 대해 관심사가 포커싱이 되어야하며
핵심 도메인 모델을 웹소켓 핸들러 내부에 구현하지않고 분리하는것은 성능전략을 분리할수 있습니다.
- Wesocket Session : 이 노드는 최소한의 인증 기능과 이벤트 흐름의 엣지연결만 구현하고 도메인 모델 구현부를 분리해냅니다.
- State Cluster : Redis / RabbitMQ 등 웹소켓과 상호연동해 상태를 관리하고 도메인모델을 분리할수 있어야하며 여기서의 관심사는 단일지점 병목현상(DB) 에대해 분산처리를 어떻게 할지에 대해 집중해야합니다.
- 웹소켓 처리 모듈을 분리하는것이 최종미션이 아니라 도메인 모델의 상태를 이벤트 드리븐방식이 아닌 CRUD방식을 그대로 이용한다고하면 성능 파멸시나리오로 이어지게됩니다.
액터모델을 채택하는 이유
분산처리 가능한 웹소켓서버의 상태를 관리하기 위해 다양한 외부장치를 함께 운영할수 있으며 대표적으로 다음과 같은 장치를 이용합니다.
- Redis : 캐싱처리와 같이 key/value 를빠르게 처리하는 스토리지이지만 pub/sub이 지원한다고 mq처럼 연결해 이용하면 성능제약에 빠르게 도달할수 있습니다.
- MQ(Kafka/RabbitMQ) : 메시지를 오랫동안 보유해야하고(영속성) 단방향 고성능인 경우 Kafka가 유리하며 복잡한 채널에 Pub/Sub및 라우팅을 지원하는 경우 RabbitMQ가 유리합니다. 하지만 메시지 흐름에따라 작동방식이 바뀌는 도메인의 상태를 설계하기에는 한계가 있습니다. 결국 상태를 관리하기 위해 Redis및 DB를 추가로 이용하게 됩니다.
액터모델은 고성능 메시지 큐와 상태관리 기능 두가지를 모두가지고 있으며~ 로컬로 작성된 액터모델은 코드의 변경없이 Cluster화가 가능해
분산처리 변신에 용이합니다.
스레드 모델 VS 액터 모델
성능향상을 위해 스레드 모델로 시작할수도 있지만 스레드 모델을 분산처리하려면 TCP Remote라는 네트워크 프로그래밍이 추가적으로 필요하게 됩니다.
액터모델의 경우 리모트 또는 분산처리하기위해 네트워크 프로그래밍을 전혀 할필요가 없으며 로컬액터의 큰 코드수정없이 설정파일로 배치규칙만 작성하면 됩니다.
액터간 리모트 통신은 TCP Peer to Peer를 이용하기 때문에 고성능 MQ로 알고 있는 Kafka보다 성능이 좋으며 자바진영에
고성능 TCP모듈로 알고 있는 netty를 선택할수 있으며, 더 성능이 좋은 프로토콜이 나온다면 교체가가능합니다.
클러스터를 포함 네트워크 프로그래밍은 이것을 설정하는것으로 끝나게 됩니다. ( 코드레벨에서 분리해 설정화로 분리됨 )
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { netty.tcp { port = 2552 } } }
이 성능은 단순하게 메시지 전송능력만을 측정하였을때 이고 액터모델은 Kafka 기능을 대신할수도 있겠지만
Reactive Stream을 상호 준수하기 때문에 액터모델에 Stream을 연결하여 Kafka가 가진 장점을 활용할수도 있습니다.
AKKA를 Reactive Stream을 준수하는 클라이언트에 모든것을 연결하는 부가 활동이 Alpaka 입니다.
https://doc.akka.io/docs/alpakka/current/index.html
액터 클러스터화하기
기존 로컬액터의 큰수정없이 액터객체를 클러스터를 묶는 기법은, 아래 샘플코드를 통해 확인해볼수 있습니다.
과거 자바버전으로 시도된 예제로 kotlin 호환되기때문에 전환가능합니다. spring 어플리케이션을 모니터링하고 API GateWay를 자체 구축할수 있는 springcloud를 적용할지 여부는 옵셔널입니다. ( 컨셉만 참고 )
모니터링
클러스터에 배치된 액터의 메시지 흐름상태는 다양한 시각화툴을통해 모니터링을 할수 있습니다.
여기서 소개하는 코드는 실제 작동되는 코드로 아래 저장소를 통해 수행해볼수 있으며~ Local Fist로 단일환경에서 시도해볼수 있습니다.
NEXT : 액터를 통한 상태와 영속성 프로그래밍