생산자보다 소비자의 소비처리 능력이 작다고 가정해봅시다. 소비자의 속도를 고려하지 않고 생산이 진행되면

그것을 소비하는 측에서 메모리 Full이 나거나, 진행을 더이상 못하는 상태가 발생할수 있습니다. 소비자가 멈추는경우 생산기도 흘려보낼 대상이 없기때문에 이 상태가 오랫동안 지속되면

생산라인도 작동이 멈추는 상태가 발생할수 있습니다.


액터에는 AkkaStream API를 활용하여 조절기(Throttle)를 탑재해 생산과 소비의 성능에서 발생하는 문제를 해결할수 있습니다. 

Reactive Stream 인터페이스를 준수하는 BackPressure를 활용하며 개념은 다음과 같습니다.



유체에서는 최종 출력이 늦어지면 역압력(역류)이 발생하며, 역압력에따라 생산을 자동 조절하는 유체조절기에서 실제 존재하는 장치에서 아이디어를 가지고 왔습니다.

생산을 조절하고 늦출수 있어야 전체 처리량을 안정적으로 가져가고 이 계수에따라 처리량도 더 많아 질수 있다라는 아이디어입니다.

이러한 컨셉은 오늘날의 데이터 처리에서도, 배치처럼 한꺼번에 처리하는것이아닌 유체(Stream)처럼 다루어야 한다는 컨셉으로  꼭 액터모델이 아니여도 다양한 언어에서 

Reactive Stream 인터페이스를 준수하는 데이터처리 API에서 유사한 컨셉으로 이용할수 있는 컨셉입니다.



이전에 작성된 액터모델에 이러한 개념을 탑재해 조절기를 탑재해 보겠습니다. 

코드생성 프롬프트

Akka Stream 의 Throttle를 탑재해~ Hello에 반응해 Kotlin 응답 속도를 조절하고 싶습니다. TPS 3으로 Kotlin 응답할수 있게 코드 개선 조건:

- HelloLimit 이라는 새로운 Command를 만들어 기존 로직과 분리해

- tps 측정이 메시지를 받을때마다 생성되면 안되며  actor당 하나만 가져야함(materializer )


추가된 코드

data class HelloLimit(val message: String, val replyTo: ActorRef<Any>) : HelloStateActorCommand()

/** HelloStateActor 클래스 */
class HelloStateActor private constructor(
    private val context: ActorContext<HelloStateActorCommand>,
    private var state: State
) : AbstractBehavior<HelloStateActorCommand>(context) {
......
    override fun createReceive(): Receive<HelloStateActorCommand> {
        return newReceiveBuilder()
            .onMessage(Hello::class.java, this::onHello)
            .onMessage(HelloLimit::class.java, this::onHelloLimit)
            .onMessage(GetHelloCount::class.java, this::onGetHelloCount)
            .onMessage(ChangeState::class.java, this::onChangeState)
            .build()
    }

    private val materializer = Materializer.createMaterializer(context.system)

    private val helloLimitSource = Source.queue<HelloLimit>(100, OverflowStrategy.backpressure())
        .throttle(3, Duration.ofSeconds(1))
        .to(Sink.foreach { cmd ->
            when (state) {
                State.HAPPY -> {
                    if (cmd.message == "Hello") {
                        helloCount++
                        cmd.replyTo.tell(HelloResponse("Kotlin"))
                    }
                }
                State.ANGRY -> {
                    cmd.replyTo.tell(HelloResponse("Don't talk to me!"))
                }
            }
        })
        .run(materializer)


    private fun onHelloLimit(command: HelloLimit): Behavior<HelloStateActorCommand> {
        helloLimitSource.offer(command)
        return this
    }
....
  • 액터모델은 기본적으로 순차처리를 보장하는 메일박스 큐를 하나 가지고 있지만~ helloLimitSource  와같이 커스텀화된 큐를 생성하고 보유할수도 있습니다. 액터모델에서 AkkaStream이 지원하는 BackPressure 큐를 가질수 있습니다.
  • 이벤트의 블락없이 TPS를 조절할수 있다란 점은 유용하고 강력한 아이디어입니다.


코드 설명 by GPT

HelloStateActor 설명 AkkaStream과 BackPresure 개념을 함께 설명 ( 한글로 ) HelloStateActor는 Akka에서 "Hello" 상태와 관련된 다양한 명령을 처리하는 액터입니다.

이 액터는 Akka Streams를 사용하여 메시지의 흐름을 관리하며, 특히 HelloLimit 메시지의 처리 속도를 제한합니다.

주요 구성 요소는 다음과 같습니다:  


명령: 액터가 처리할 수 있는 명령을 정의합니다. 예를 들어, Hello, GetHelloCount, ChangeState, HelloLimit 등이 있습니다.

응답: 액터가 반환할 수 있는 응답을 정의합니다. 예를 들어, HelloResponse, HelloCountResponse 등이 있습니다.

상태: 액터의 상태를 나타내는 열거형입니다 (HAPPY 또는 ANGRY). 동작: 액터의 동작은 createReceive 메서드에서 정의되며, 각 명령을 어떻게 처리할지 지정합니다.


Akka Streams와 Backpressure Source.queue: 스트림 외부에서 요소를 공급할 수 있는 소스를 생성합니다. 지정된 크기와 오버플로우 전략을 가진 버퍼를 사용합니다.

Throttle: 스트림을 통과하는 요소의 속도를 제한합니다. 이 경우, 초당 3개의 메시지를 허용합니다. Sink.foreach: 스트림의 각 요소를 소비하고 작업을 수행합니다. 여기서는 액터의 상태에 따라 HelloLimit 명령을 처리합니다.

Backpressure: 메시지의 생산자(액터)가 소비자(스트림)를 압도하지 않도록 보장합니다. 버퍼가 가득 차면 생산자는 속도가 느려지거나 소비자가 따라잡을 때까지 일시 중지됩니다.

HelloStateActor는 Akka Streams를 사용하여 HelloLimit 메시지를 처리하며, 메시지 처리 속도를 제어하는 스로틀링 메커니즘을 사용합니다. 이를 통해 액터가 메시지를 제어된 속도로 처리하여 과부하를 방지합니다.



검증코드

측정하지 못하면 코드개선을 할수 없습니다. 유실검사를 포함 최종 수신 TPS를 검사할수 있습니다. 오늘날의 컴퓨팅은 속도가 충분히 빠르기 때문에 많은것을 한꺼번에 처리하는 동시처리량도 중요하지만 최종 소비과정에서 IO의 속도를 고려 처리량을 제어하는 기능도 중요합니다.

최근 Copilot의 등장으로 유닛테스트를 거의 직접 작성할 필요없이~ 코드를 참조해 유닛테스트를 작성해달라고 하면 다양한 시나리오로 작성을 해줍니다.

시간이 없어 유닛테스트 작성할 시간이 없단것은 더이상 핑계가 될수 없으며~ 자신이 작성한 코드를 설명하는 유닛테스트와 검증코드를 가동함으로 코드생산 활동을 더 가속화 할수 있을것으로 예상해봅니다.

    @Test
    fun testHelloLimitCommand() {
        val probe = testKit.createTestProbe<Any>()
        val helloStateActor = testKit.spawn(HelloStateActor.create(State.HAPPY))

        // Send 100 HelloLimit messages
        val startTime = System.currentTimeMillis()
        for (i in 1..100) {
            helloStateActor.tell(HelloLimit("Hello", probe.ref()))
        }

        // Expect 100 responses
        for (i in 1..100) {
            probe.expectMessage(HelloResponse("Kotlin"))
        }
        val endTime = System.currentTimeMillis()

        // Calculate TPS
        val durationInSeconds = (endTime - startTime) / 1000.0
        val tps = 100 / durationInSeconds
        println("TPS: $tps")

        // Verify the hello count
        helloStateActor.tell(GetHelloCount(probe.ref()))
        probe.expectMessage(HelloCountResponse(100))
    }


git : https://github.com/psmon/java-labs/blob/master/KotlinBootLabs/src/main/kotlin/actor/hellostate/HelloStateActor.kt


next : 액터모델 내에 타이머를 탑재해~ 반복작동하는 이벤트를 작성해보겠습니다.






  • No labels