FSM 개념을 사용해 긴시간마다 작동되는 배치처리가 아닌, 준실시간성 벌크처리 액터를 구현시도를 해보겠습니다.

유한상태기계(Finite State Machine)

위키백과, 우리 모두의 백과사전.

유한 상태 기계(finite-state machine, FSM) 또는 유한 오토마톤(finite automaton, FA; 복수형: 유한 오토마타 finite automata)는 컴퓨터 프로그램과 전자 논리 회로를 설계하는데에 쓰이는 수학적 모델이다. 간단히 상태 기계라고 부르기도 한다. 유한 상태 기계는 유한한 개수의 상태를 가질 수 있는 오토마타, 즉 추상 기계라고 할 수 있다. 이러한 기계는 한 번에 오로지 하나의 상태만을 가지게 되며, 현재 상태(Current State)란 임의의 주어진 시간의 상태를 칭한다. 이러한 기계는 어떠한 사건(Event)에 의해 한 상태에서 다른 상태로 변화할 수 있으며, 이를 전이(Transition)이라 한다. 특정한 유한 오토마톤은 현재 상태로부터 가능한 전이 상태와, 이러한 전이를 유발하는 조건들의 집합으로서 정의된다.


'안녕?'을  한단어씩 검사하는 상태머신


  • 유한 상태 기계는 자신이 취할 수 있는 유한한 갯수의 상태들을 가진다.
  • 그리고 그 중에서 반드시 하나의 상태만 취한다.
  • 현재 상태는 특정 조건이 되면 다른 상태로 변할 수 있다.
  • 유한 상태 기계는 가능한 상태들의 집합과 각 상태들의 전이 조건으로 정의 될 수 있다.
  • 상태들의 노드와 그 노드들을 연결하는 조건의 엣지로 표현할 수 있다(그래프).


이 아이디어는 단순하지만~ 불규칙적으로 발생하는 이벤트를 안정적으로 처리하는 벌크처리기에 적용 할수 있습니다.

( 이벤트가 발생할때마다 1 Insert를 하게되는 경우~ 트래픽 상황에 따라 성능장애 유발을 발생시키게됩니다. )



우리가 필요로하는 상태머신기 설계



코드생성 프롬프트

나는 FSM 개념을 사용해 다음과같이 준실시간 벌크처리 액터를 만들고 싶습니다.

초기상태는 Idle 상태이며, 이벤트 수신대기를 합니다.
이벤트를 하나라도 받게되면 액티브 상태가 되며 내부큐에 모읍니다.
Flush이벤트는 그동안 모아둔 데이터를 벌크처리하고 초기화하고 Idle상태로 만듭니다.
액티브 상태에서 모은 데이터가 100개 이상되면 Flush 이벤트를 발생시킵니다.
액티브 상태에서 10초가 흐르면 Flush이벤트를 자동 발생시킵니다. 


구현된 액터

package actor.bulkprocessor

import akka.actor.Cancellable
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.javadsl.*
import java.time.Duration

sealed class BulkProcessorCommand
data class DataEvent(val data: Any, val replyTo: ActorRef<Any>) : BulkProcessorCommand()
object Flush : BulkProcessorCommand()
private object FlushTimeout : BulkProcessorCommand() // 내부 타임아웃 이벤트

sealed class BulkProcessorResponse
data class BulkTaskCompleted(val message: String) : BulkProcessorResponse()


class BulkProcessor private constructor(
    context: ActorContext<BulkProcessorCommand>,
    private val buffer: MutableList<Any> = mutableListOf()
) : AbstractBehavior<BulkProcessorCommand>(context) {

    companion object {
        fun create(): Behavior<BulkProcessorCommand> {
            return Behaviors.setup { context ->
                BulkProcessor(context)
            }
        }
    }

    override fun createReceive(): Receive<BulkProcessorCommand> {
        return idle()
    }

    private fun idle(): Receive<BulkProcessorCommand> {
        return newReceiveBuilder()
            .onMessage(DataEvent::class.java) { event ->
                if (event.data == "testend") {
                    event.replyTo.tell(BulkTaskCompleted("Bulk task completed"))
                    flushBuffer()
                    idle()
                } else {
                    context.log.info("Received first DataEvent, switching to active state.")
                    buffer.add(event.data)
                    startFlushTimer()
                    active()
                }
            }
            .onMessage(Flush::class.java) {
                // Idle 상태에서 Flush 명령이 오면 무시하거나 로깅
                Behaviors.same()
            }
            .build()
    }

    private fun active(): Receive<BulkProcessorCommand> {
        return newReceiveBuilder()
            .onMessage(DataEvent::class.java) { event ->
                if (event.data == "testend") {
                    event.replyTo.tell(BulkTaskCompleted("Bulk task completed"))
                    flushBuffer()
                    idle()
                } else {
                    buffer.add(event.data)
                    if (buffer.size >= 100) {
                        context.log.info("Buffer size reached 100, flushing data.")
                        flushBuffer()
                        idle()
                    } else {
                        Behaviors.same()
                    }
                }
            }
            .onMessage(Flush::class.java) {
                context.log.info("Flush command received, flushing data.")
                flushBuffer()
                idle()
            }
            .onMessageEquals(FlushTimeout) {
                context.log.info("Flush timeout reached, flushing data.")
                flushBuffer()
                idle()
            }
            .build()
    }

    private fun flushBuffer() {
        // 실제 벌크 처리를 수행하는 로직을 여기에 구현
        context.log.info("Processing ${buffer.size} events.")
        buffer.clear()
        stopFlushTimer()
    }

    // 타이머 관리
    private var flushTimer: Cancellable? = null

    private fun startFlushTimer() {
        flushTimer = context.scheduleOnce(
            Duration.ofSeconds(3),
            context.self,
            FlushTimeout
        )
    }

    private fun stopFlushTimer() {
        flushTimer?.cancel()
        flushTimer = null
    }
}


유닛테스트

    @Test
    fun bulkInsertTest() {
        val probe = testKit.createTestProbe<Any>()
        val helloStateActor = testKit.spawn(BulkProcessor.create())

        // Manual Flush Test
        helloStateActor.tell(DataEvent("data1", probe.ref()))
        helloStateActor.tell(DataEvent("data2", probe.ref()))
        helloStateActor.tell(DataEvent("data3", probe.ref()))
        helloStateActor.tell(Flush)

        // Auto Flush Test over 100 , Send 110 DataEvent messages
        for (i in 1..110) {
            helloStateActor.tell(DataEvent("data$i", probe.ref()))
        }

        // Auto Flush Test TimeOut 3 seconds
        helloStateActor.tell(DataEvent("data1", probe.ref()))
        helloStateActor.tell(DataEvent("data2", probe.ref()))
        helloStateActor.tell(DataEvent("data3", probe.ref()))
        helloStateActor.tell(DataEvent("data1", probe.ref()))
        helloStateActor.tell(DataEvent("data2", probe.ref()))
        helloStateActor.tell(DataEvent("data3", probe.ref()))
        Thread.sleep(Duration.ofSeconds(5).toMillis())

        helloStateActor.tell(DataEvent("testend", probe.ref()))

        probe.expectMessage(BulkTaskCompleted("Bulk task completed"))
    }

이 테스트 코드는 BulkProcessor 액터의 동작을 검증합니다. 주요 테스트 시나리오는 다음과 같습니다:

  • Manual Flush Test: 수동으로 Flush 명령을 보내서 버퍼를 비우는 테스트입니다.
  • Auto Flush Test (Buffer Size): 100개 이상의 DataEvent 메시지를 보내서 버퍼가 자동으로 비워지는지 테스트합니다.
  • Auto Flush Test (Timeout): 3초 타임아웃이 지나면 버퍼가 자동으로 비워지는지 테스트합니다.
  • manualTime.timePasses(Duration.ofSeconds(5)) 을 이용하면 가상의 시간을 흘려보내 기다림없이 시간테스트가 가능합니다.


TEST 결과

19:25:01.718 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Received first DataEvent, switching to active state.
19:25:01.719 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Flush command received, flushing data.
19:25:01.719 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Processing 3 events.
19:25:01.719 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Received first DataEvent, switching to active state.
19:25:01.721 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Buffer size reached 100, flushing data.
19:25:01.721 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Processing 100 events.
19:25:01.721 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Received first DataEvent, switching to active state.
19:25:06.724 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO  actor.bulkprocessor.BulkProcessor - Processing 16 events.
19:25:06.746 [BulkProcessorTest-akka.actor.default-dispatcher-2] INFO  akka.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
> Task :test

장점

  1. 효율적인 자원 활용:

    • 배치 처리: 데이터를 누적하여 일괄 처리함으로써 개별 메시지를 처리하는 데 따른 오버헤드를 줄여 성능을 향상시킵니다.
    • I/O 작업 감소: 데이터베이스 쓰기나 네트워크 호출과 같은 비용이 많이 드는 I/O 작업을 최소화하여 처리량을 높입니다.
  2. 확장성:

    • 자동 플러시 메커니즘: 액터가 버퍼 크기나 타임아웃에 따라 자동으로 플러시하여 수동 개입 없이 다양한 데이터 양을 처리할 수 있습니다.
    • 동시성: Akka 액터를 활용하여 시스템이 여러 작업을 동시에 처리할 수 있으며, 수평 확장이 가능합니다.
  3. 신뢰성 및 내결함성:

    • 백프레셔 처리: 데이터 처리 속도를 제어하여 다운스트림 시스템에 과부하가 걸리는 것을 방지합니다.
    • 타임아웃: 자동 타임아웃을 통해 저트래픽 기간에도 데이터가 적시에 처리되도록 보장합니다.
  4. 테스트 용이성:

    • 수동 시간 제어: ManualTime을 사용하면 테스트에서 시간 종속적인 동작을 정확하게 제어하여 신뢰할 수 있는 테스트를 수행할 수 있습니다.
    • 격리성: ActorTestKitTestProbe를 사용하여 격리된 테스트 환경을 제공함으로써 테스트가 결정론적이고 부작용이 없도록 합니다.
  5. 유연성:

    • 수동 및 자동 제어: 수동 플러시 명령과 버퍼 크기 또는 타임아웃 기반 자동 트리거를 모두 지원하여 다양한 운영 시나리오에서 유연성을 제공합니다.
    • 구성 가능한 파라미터: 버퍼 크기와 타임아웃을 조정하여 다양한 워크로드 요구 사항에 맞출 수 있습니다.
  6. 응답성:

    • 적시 처리: 자동 플러시를 통해 데이터가 너무 오래 처리되지 않고 남아 있지 않도록 하여 시스템의 응답성을 유지합니다.
    • 즉각적인 피드백: BulkTaskCompleted 메시지를 기대하고 확인함으로써 작업 완료 시 시스템이 피드백을 제공하는지 확인합니다.

결론

BulkProcessorTest 클래스는 다양한 조건에서 BulkProcessor 액터가 대량의 데이터 처리를 올바르게 처리하는지 효과적으로 검증합니다. 수동 및 자동 플러시 메커니즘을 모두 테스트함으로써 액터가 다양한 운영 시나리오에서 올바르게 동작하는지 확인합니다.

핵심 요점:

  • 효율성: 배치 처리를 통해 자원 활용을 최적화하고 시스템 성능을 향상시킵니다.
  • 확장성: 액터 모델과 자동 플러시를 통해 시스템이 대량의 데이터를 처리할 수 있습니다.
  • 신뢰성: 타임아웃과 버퍼 임계값을 통해 데이터 손실과 시스템 과부하를 방지합니다.
  • 테스트 가능성: Akka의 테스트 도구를 사용하여 정확하고 신뢰할 수 있는 유닛 테스트를 작성할 수 있으며, 이는 고품질 코드를 유지하는 데 중요합니다.
  • 유연성 및 응답성: 시스템이 다양한 워크로드에 적응하면서 데이터 처리를 적시에 보장합니다.

이러한 측면을 철저히 테스트함으로써 개발자는 BulkProcessor 액터의 견고성에 자신감을 갖고 프로덕션 환경에 도입할 수 있습니다.


git : https://github.com/psmon/java-labs/blob/master/KotlinBootLabs/src/test/kotlin/actor/bulkprocessor/BulkProcessorTest.kt


NEXT : 액터를 계층형으로 구성하고 자식노드를 감시하는 SuperVisor를 만들기




  • No labels