액터는 메시지 분산처리를 위해 분배기를 탑재해 하위에 구성할수 있습니다.

  • 라우터 : 분배기 , 분배가 시작되는 지점
  • 라우팅 : 분배되는 방식
  • 라우티 : 분배된 메시지가 도달하는 목적지


분배를 위한 라우터의 구성집합은 다음 두가지를 활용할수 있습니다.

  • POOL : 동일한 액터를 묶을수 있음
  • GROUP : 성격이 다른 액터를 묶을수 있음



Akka는 다양한 종류의 라우터를 제공하여 여러 액터에 메시지를 분산시킬 수 있도록 합니다. Akka의 라우터 종류는 다음과 같습니다:

  1. RoundRobinRouter:

    • 메시지를 액터 풀의 각 액터에 순서대로 보냅니다. 라운드로빈 방식으로 고르게 분산하여 부하를 분산합니다.
  2. RandomRouter:

    • 메시지를 액터 풀의 액터 중 무작위로 선택하여 전송합니다. 랜덤 선택으로 부하를 분산합니다.
  3. SmallestMailboxRouter:

    • 가장 적은 메시지를 보유하고 있는 액터에게 메시지를 보냅니다. 주로 부하가 고르게 분산되도록 사용합니다.
  4. BroadcastRouter:

    • 모든 액터에게 동일한 메시지를 보냅니다. 모든 액터가 동일한 메시지를 처리해야 할 때 사용됩니다.
  5. ScatterGatherFirstCompletedRouter:

    • 메시지를 풀에 있는 모든 액터에게 보내고, 가장 먼저 응답을 받는 액터의 결과를 반환합니다. 응답 시간이 중요한 경우에 유용합니다.
  6. ConsistentHashingRouter:

    • 해싱을 통해 메시지를 특정한 액터로 라우팅합니다. 동일한 키를 가진 메시지가 항상 동일한 액터에게 전달되도록 보장합니다.
  7. TailChoppingRouter:

    • 메시지를 첫 번째 액터에게 보내고, 일정 시간 후에 응답이 없으면 다음 액터에게 보내는 방식을 반복합니다. 타임아웃과 재시도를 사용하여 최적의 응답을 찾습니다.

이러한 라우터를 사용하면 여러 액터로 작업을 분산하고, 시스템 성능을 최적화하며, 부하를 균등하게 관리할 수 있습니다. Akka는 각 라우터가 상황에 맞게 사용될 수 있도록 다양한 옵션을 제공합니다.



앞장에서 작성한 HelloActor의 분배처리를 위한 상위 액터를 생성시도해보겠습니다.

코드생성 프롬프트

HelloActor를 관리하는 HelloManagerActor를 생성하고싶습니다.
다음 규칙을 준수, 이 기반으로 액터모델을 작성해

- HelloActor를  5개 Pool로 생성구성
- 라운드 로빈(순차분배) 방식으로 명령을 내리고 싶습니다.


HelloManagerActor

HelloManagerActor
package actor.router
import actor.Hello
import actor.HelloActor
import actor.HelloActorCommand
import actor.HelloActorResponse
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.javadsl.AbstractBehavior
import akka.actor.typed.javadsl.ActorContext
import akka.actor.typed.javadsl.Behaviors
import akka.actor.typed.javadsl.Receive
import akka.actor.typed.javadsl.PoolRouter
import akka.actor.typed.javadsl.Routers

sealed class HelloManagerCommand
data class DistributedHelloMessage(val message: String, val replyTo: ActorRef<HelloActorResponse>) : HelloManagerCommand()

class HelloManagerActor private constructor(
    context: ActorContext<HelloManagerCommand>,
    private var router: PoolRouter<HelloActorCommand>,
    private val routerRef: ActorRef<HelloActorCommand>
) : AbstractBehavior<HelloManagerCommand>(context) {

    companion object {
        fun create(): Behavior<HelloManagerCommand> {
            return Behaviors.setup { context ->

                val router = Routers.pool(5, Behaviors.supervise(HelloActor.create())
                    .onFailure(SupervisorStrategy.restart()))
                    .withRoundRobinRouting()

                val routerRef = context.spawn(router, "hello-actor-pool")

                HelloManagerActor(context, router, routerRef)
            }
        }
    }

    override fun createReceive(): Receive<HelloManagerCommand> {
        return newReceiveBuilder()
            .onMessage(DistributedHelloMessage::class.java, this::onSendHelloMessage)
            .build()
    }

    private fun onSendHelloMessage(command: DistributedHelloMessage): Behavior<HelloManagerCommand> {
        routerRef.tell(Hello(command.message, command.replyTo))
        return this
    }
}

중요 코드

  • withRoundRobinRouting : akka 제공 라우팅을 선택하거나 커스텀하게 구성할수도 있습니다.
  • SupervisorStrategy.restart() : 라우티에 연결된 액터가 죽는경우 복구옵션을 선택할수 있습니다.
  • routerRef : 최종생성된 라우터로 액터와 동일하게 tell을 통해 분산처리 이벤트를 전송할수 있습니다.
  • "hello-actor-pool" : pool 이름을 명시적으로 지정해 추적및 별할수 있습니다.

HelloManagerActorTest

HelloManagerActorTest
package actor.router

import actor.HelloActorResponse
import actor.HelloResponse
import akka.actor.testkit.typed.javadsl.ActorTestKit
import akka.actor.testkit.typed.javadsl.TestProbe
import akka.actor.typed.ActorRef
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test

class HelloManagerActorTest {

    companion object {
        private val testKit = ActorTestKit.create()

        @JvmStatic
        @BeforeAll
        fun setup() {
            // Setup code if needed
        }

        @JvmStatic
        @AfterAll
        fun tearDown() {
            testKit.shutdownTestKit()
        }
    }

    @Test
    fun testSendHelloMessage() {
        val helloManager: ActorRef<HelloManagerCommand> = testKit.spawn(HelloManagerActor.create())
        val probe = TestProbe.create<HelloActorResponse>(testKit.system())

        helloManager.tell(DistributedHelloMessage("Hello", probe.ref()))
        helloManager.tell(DistributedHelloMessage("Hello", probe.ref()))
        helloManager.tell(DistributedHelloMessage("Hello", probe.ref()))
        helloManager.tell(DistributedHelloMessage("Hello", probe.ref()))
        helloManager.tell(DistributedHelloMessage("Hello", probe.ref()))

        probe.expectMessage(HelloResponse("Kotlin"))
        probe.expectMessage(HelloResponse("Kotlin"))
        probe.expectMessage(HelloResponse("Kotlin"))
        probe.expectMessage(HelloResponse("Kotlin"))
        probe.expectMessage(HelloResponse("Kotlin"))

    }
}


TEST결과

[akka://HelloManagerActorTest/user/$a/hello-actor-pool/$d] Received valid Hello message. Count incremented to 1
[akka://HelloManagerActorTest/user/$a/hello-actor-pool/$c] Received valid Hello message. Count incremented to 1
[akka://HelloManagerActorTest/user/$a/hello-actor-pool/$a] Received valid Hello message. Count incremented to 1
[akka://HelloManagerActorTest/user/$a/hello-actor-pool/$e] Received valid Hello message. Count incremented to 1
[akka://HelloManagerActorTest/user/$a/hello-actor-pool/$b] Received valid Hello message. Count incremented to 1
INFO  akka.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
> Task :test
  • context.self.path()를 통해 액터에 접근가능한 주소를 알수 있으며~ 동일한 액터가 아닌 다른 액터에게 분산처리가 되는지 확인할수 있습니다.

액터모델을 이용해 다양한 문제를 해결할수 있는, AKKA에서 제공하는 다양한 메시지 패턴  툴들을 알아보았습니다.

액터모델은 동일언어를 사용하는 클러스터내에서 상태관리할수 있는 장점이 있지만, 브라우저와 같이 최종 엔드사용자를 위해 제공되는 프로토콜은 아닙니다.

다음장에서는 웹소켓 핸들러와 액터모델을 연결해 최종 사용자의 세션정보를 제어하고 실시간 이벤트를 전달하는 방법을 알아보겠습니다.

Next








  • No labels