FSM 개념을 사용해 긴시간마다 작동되는 배치처리가 아닌, 준실시간성 벌크처리 액터를 구현시도를 해보겠습니다.

유한상태기계(Finite State Machine)

위키백과, 우리 모두의 백과사전.

유한 상태 기계(finite-state machine, FSM) 또는 유한 오토마톤(finite automaton, FA; 복수형: 유한 오토마타 finite automata)는 컴퓨터 프로그램과 전자 논리 회로를 설계하는데에 쓰이는 수학적 모델이다. 간단히 상태 기계라고 부르기도 한다. 유한 상태 기계는 유한한 개수의 상태를 가질 수 있는 오토마타, 즉 추상 기계라고 할 수 있다. 이러한 기계는 한 번에 오로지 하나의 상태만을 가지게 되며, 현재 상태(Current State)란 임의의 주어진 시간의 상태를 칭한다. 이러한 기계는 어떠한 사건(Event)에 의해 한 상태에서 다른 상태로 변화할 수 있으며, 이를 전이(Transition)이라 한다. 특정한 유한 오토마톤은 현재 상태로부터 가능한 전이 상태와, 이러한 전이를 유발하는 조건들의 집합으로서 정의된다.


'안녕?'을  한단어씩 검사하는 상태머신



이 아이디어는 단순하지만~ 불규칙적으로 발생하는 이벤트를 안정적으로 처리하는 벌크처리기에 적용 할수 있습니다.

( 이벤트가 발생할때마다 1 Insert를 하게되는 경우~ 트래픽 상황에 따라 성능장애 유발을 발생시키게됩니다. )



우리가 필요로하는 상태머신기 설계



나는 FSM 개념을 사용해 다음과같이 준실시간 벌크처리 액터를 만들고 싶습니다.

초기상태는 Idle 상태이며, 이벤트 수신대기를 합니다.
이벤트를 하나라도 받게되면 액티브 상태가 되며 내부큐에 모읍니다.
Flush이벤트는 그동안 모아둔 데이터를 벌크처리하고 초기화하고 Idle상태로 만듭니다.
액티브 상태에서 모은 데이터가 100개 이상되면 Flush 이벤트를 발생시킵니다.
액티브 상태에서 10초가 흐르면 Flush이벤트를 자동 발생시킵니다. 


구현된 액터

package actor.bulkprocessor

import akka.actor.Cancellable
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.javadsl.*
import java.time.Duration

sealed class BulkProcessorCommand
data class DataEvent(val data: Any, val replyTo: ActorRef<Any>) : BulkProcessorCommand()
object Flush : BulkProcessorCommand()
private object FlushTimeout : BulkProcessorCommand() // 내부 타임아웃 이벤트

sealed class BulkProcessorResponse
data class BulkTaskCompleted(val message: String) : BulkProcessorResponse()


class BulkProcessor private constructor(
    context: ActorContext<BulkProcessorCommand>,
    private val buffer: MutableList<Any> = mutableListOf()
) : AbstractBehavior<BulkProcessorCommand>(context) {

    companion object {
        fun create(): Behavior<BulkProcessorCommand> {
            return Behaviors.setup { context ->
                BulkProcessor(context)
            }
        }
    }

    override fun createReceive(): Receive<BulkProcessorCommand> {
        return idle()
    }

    private fun idle(): Receive<BulkProcessorCommand> {
        return newReceiveBuilder()
            .onMessage(DataEvent::class.java) { event ->
                if (event.data == "testend") {
                    event.replyTo.tell(BulkTaskCompleted("Bulk task completed"))
                    flushBuffer()
                    idle()
                } else {
                    context.log.info("Received first DataEvent, switching to active state.")
                    buffer.add(event.data)
                    startFlushTimer()
                    active()
                }
            }
            .onMessage(Flush::class.java) {
                // Idle 상태에서 Flush 명령이 오면 무시하거나 로깅
                Behaviors.same()
            }
            .build()
    }

    private fun active(): Receive<BulkProcessorCommand> {
        return newReceiveBuilder()
            .onMessage(DataEvent::class.java) { event ->
                if (event.data == "testend") {
                    event.replyTo.tell(BulkTaskCompleted("Bulk task completed"))
                    flushBuffer()
                    idle()
                } else {
                    buffer.add(event.data)
                    if (buffer.size >= 100) {
                        context.log.info("Buffer size reached 100, flushing data.")
                        flushBuffer()
                        idle()
                    } else {
                        Behaviors.same()
                    }
                }
            }
            .onMessage(Flush::class.java) {
                context.log.info("Flush command received, flushing data.")
                flushBuffer()
                idle()
            }
            .onMessageEquals(FlushTimeout) {
                context.log.info("Flush timeout reached, flushing data.")
                flushBuffer()
                idle()
            }
            .build()
    }

    private fun flushBuffer() {
        // 실제 벌크 처리를 수행하는 로직을 여기에 구현
        context.log.info("Processing ${buffer.size} events.")
        buffer.clear()
        stopFlushTimer()
    }

    // 타이머 관리
    private var flushTimer: Cancellable? = null

    private fun startFlushTimer() {
        flushTimer = context.scheduleOnce(
            Duration.ofSeconds(3),
            context.self,
            FlushTimeout
        )
    }

    private fun stopFlushTimer() {
        flushTimer?.cancel()
        flushTimer = null
    }
}


유닛테스트

    @Test
    fun bulkInsertTest() {
        val probe = testKit.createTestProbe<Any>()
        val helloStateActor = testKit.spawn(BulkProcessor.create())

        // Manual Flush Test
        helloStateActor.tell(DataEvent("data1", probe.ref()))
        helloStateActor.tell(DataEvent("data2", probe.ref()))
        helloStateActor.tell(DataEvent("data3", probe.ref()))
        helloStateActor.tell(Flush)

        // Auto Flush Test over 100 , Send 110 DataEvent messages
        for (i in 1..110) {
            helloStateActor.tell(DataEvent("data$i", probe.ref()))
        }

        // Auto Flush Test TimeOut 3 seconds
        helloStateActor.tell(DataEvent("data1", probe.ref()))
        helloStateActor.tell(DataEvent("data2", probe.ref()))
        helloStateActor.tell(DataEvent("data3", probe.ref()))
        helloStateActor.tell(DataEvent("data1", probe.ref()))
        helloStateActor.tell(DataEvent("data2", probe.ref()))
        helloStateActor.tell(DataEvent("data3", probe.ref()))
        Thread.sleep(Duration.ofSeconds(5).toMillis())

        helloStateActor.tell(DataEvent("testend", probe.ref()))

        probe.expectMessage(BulkTaskCompleted("Bulk task completed"))
    }

이 테스트 코드는 BulkProcessor 액터의 동작을 검증합니다. 주요 테스트 시나리오는 다음과 같습니다:


TEST 결과

19:25:01.718 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Received first DataEvent, switching to active state.
19:25:01.719 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Flush command received, flushing data.
19:25:01.719 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Processing 3 events.
19:25:01.719 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Received first DataEvent, switching to active state.
19:25:01.721 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Buffer size reached 100, flushing data.
19:25:01.721 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Processing 100 events.
19:25:01.721 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Received first DataEvent, switching to active state.
19:25:06.724 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Processing 16 events.
19:25:06.746 [BulkProcessorTest-akka.actor.default-dispatcher-2] INFO  akka.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
> Task :test

장점

  1. 효율적인 자원 활용:

  2. 확장성:

  3. 신뢰성 및 내결함성:

  4. 테스트 용이성:

  5. 유연성:

  6. 응답성:


결론

BulkProcessorTest 클래스는 다양한 조건에서 BulkProcessor 액터가 대량의 데이터 처리를 올바르게 처리하는지 효과적으로 검증합니다. 수동 및 자동 플러시 메커니즘을 모두 테스트함으로써 액터가 다양한 운영 시나리오에서 올바르게 동작하는지 확인합니다.

핵심 요점:

이러한 측면을 철저히 테스트함으로써 개발자는 BulkProcessor 액터의 견고성에 자신감을 갖고 프로덕션 환경에 도입할 수 있습니다.



이러한 컨셉은 액터를 지원하는 다양한 언어를 통해 설계하고 활용할수 있습니다.

관련링크


NEXT : 액터를 계층형으로 구성하고 자식노드를 감시하는 SuperVisor를 만들어보기