FSM 개념을 사용해 긴시간마다 작동되는 배치처리가 아닌, 준실시간성 벌크처리 액터를 구현시도를 해보겠습니다.
위키백과, 우리 모두의 백과사전.
유한 상태 기계(finite-state machine, FSM) 또는 유한 오토마톤(finite automaton, FA; 복수형: 유한 오토마타 finite automata)는 컴퓨터 프로그램과 전자 논리 회로를 설계하는데에 쓰이는 수학적 모델이다. 간단히 상태 기계라고 부르기도 한다. 유한 상태 기계는 유한한 개수의 상태를 가질 수 있는 오토마타, 즉 추상 기계라고 할 수 있다. 이러한 기계는 한 번에 오로지 하나의 상태만을 가지게 되며, 현재 상태(Current State)란 임의의 주어진 시간의 상태를 칭한다. 이러한 기계는 어떠한 사건(Event)에 의해 한 상태에서 다른 상태로 변화할 수 있으며, 이를 전이(Transition)이라 한다. 특정한 유한 오토마톤은 현재 상태로부터 가능한 전이 상태와, 이러한 전이를 유발하는 조건들의 집합으로서 정의된다.
이 아이디어는 단순하지만~ 불규칙적으로 발생하는 이벤트를 안정적으로 처리하는 벌크처리기에 적용 할수 있습니다.
( 이벤트가 발생할때마다 1 Insert를 하게되는 경우~ 트래픽 상황에 따라 성능장애 유발을 발생시키게됩니다. )
![]()
나는 FSM 개념을 사용해 다음과같이 준실시간 벌크처리 액터를 만들고 싶습니다. 초기상태는 Idle 상태이며, 이벤트 수신대기를 합니다. |
package actor.bulkprocessor
import akka.actor.Cancellable
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.javadsl.*
import java.time.Duration
sealed class BulkProcessorCommand
data class DataEvent(val data: Any, val replyTo: ActorRef<Any>) : BulkProcessorCommand()
object Flush : BulkProcessorCommand()
private object FlushTimeout : BulkProcessorCommand() // 내부 타임아웃 이벤트
sealed class BulkProcessorResponse
data class BulkTaskCompleted(val message: String) : BulkProcessorResponse()
class BulkProcessor private constructor(
context: ActorContext<BulkProcessorCommand>,
private val buffer: MutableList<Any> = mutableListOf()
) : AbstractBehavior<BulkProcessorCommand>(context) {
companion object {
fun create(): Behavior<BulkProcessorCommand> {
return Behaviors.setup { context ->
BulkProcessor(context)
}
}
}
override fun createReceive(): Receive<BulkProcessorCommand> {
return idle()
}
private fun idle(): Receive<BulkProcessorCommand> {
return newReceiveBuilder()
.onMessage(DataEvent::class.java) { event ->
if (event.data == "testend") {
event.replyTo.tell(BulkTaskCompleted("Bulk task completed"))
flushBuffer()
idle()
} else {
context.log.info("Received first DataEvent, switching to active state.")
buffer.add(event.data)
startFlushTimer()
active()
}
}
.onMessage(Flush::class.java) {
// Idle 상태에서 Flush 명령이 오면 무시하거나 로깅
Behaviors.same()
}
.build()
}
private fun active(): Receive<BulkProcessorCommand> {
return newReceiveBuilder()
.onMessage(DataEvent::class.java) { event ->
if (event.data == "testend") {
event.replyTo.tell(BulkTaskCompleted("Bulk task completed"))
flushBuffer()
idle()
} else {
buffer.add(event.data)
if (buffer.size >= 100) {
context.log.info("Buffer size reached 100, flushing data.")
flushBuffer()
idle()
} else {
Behaviors.same()
}
}
}
.onMessage(Flush::class.java) {
context.log.info("Flush command received, flushing data.")
flushBuffer()
idle()
}
.onMessageEquals(FlushTimeout) {
context.log.info("Flush timeout reached, flushing data.")
flushBuffer()
idle()
}
.build()
}
private fun flushBuffer() {
// 실제 벌크 처리를 수행하는 로직을 여기에 구현
context.log.info("Processing ${buffer.size} events.")
buffer.clear()
stopFlushTimer()
}
// 타이머 관리
private var flushTimer: Cancellable? = null
private fun startFlushTimer() {
flushTimer = context.scheduleOnce(
Duration.ofSeconds(3),
context.self,
FlushTimeout
)
}
private fun stopFlushTimer() {
flushTimer?.cancel()
flushTimer = null
}
} |
@Test
fun bulkInsertTest() {
val probe = testKit.createTestProbe<Any>()
val helloStateActor = testKit.spawn(BulkProcessor.create())
// Manual Flush Test
helloStateActor.tell(DataEvent("data1", probe.ref()))
helloStateActor.tell(DataEvent("data2", probe.ref()))
helloStateActor.tell(DataEvent("data3", probe.ref()))
helloStateActor.tell(Flush)
// Auto Flush Test over 100 , Send 110 DataEvent messages
for (i in 1..110) {
helloStateActor.tell(DataEvent("data$i", probe.ref()))
}
// Auto Flush Test TimeOut 3 seconds
helloStateActor.tell(DataEvent("data1", probe.ref()))
helloStateActor.tell(DataEvent("data2", probe.ref()))
helloStateActor.tell(DataEvent("data3", probe.ref()))
helloStateActor.tell(DataEvent("data1", probe.ref()))
helloStateActor.tell(DataEvent("data2", probe.ref()))
helloStateActor.tell(DataEvent("data3", probe.ref()))
Thread.sleep(Duration.ofSeconds(5).toMillis())
helloStateActor.tell(DataEvent("testend", probe.ref()))
probe.expectMessage(BulkTaskCompleted("Bulk task completed"))
} |
이 테스트 코드는 BulkProcessor 액터의 동작을 검증합니다. 주요 테스트 시나리오는 다음과 같습니다:
manualTime.timePasses(Duration.ofSeconds(5)) 을 이용하면 가상의 시간을 흘려보내 기다림없이 시간테스트가 가능합니다.
19:25:01.718 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO actor.bulkprocessor.BulkProcessor - Received first DataEvent, switching to active state. 19:25:01.719 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO actor.bulkprocessor.BulkProcessor - Flush command received, flushing data. 19:25:01.719 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO actor.bulkprocessor.BulkProcessor - Processing 3 events. 19:25:01.719 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO actor.bulkprocessor.BulkProcessor - Received first DataEvent, switching to active state. 19:25:01.721 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO actor.bulkprocessor.BulkProcessor - Buffer size reached 100, flushing data. 19:25:01.721 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO actor.bulkprocessor.BulkProcessor - Processing 100 events. 19:25:01.721 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO actor.bulkprocessor.BulkProcessor - Received first DataEvent, switching to active state. 19:25:06.724 [BulkProcessorTest-akka.actor.default-dispatcher-4] INFO actor.bulkprocessor.BulkProcessor - Processing 16 events. 19:25:06.746 [BulkProcessorTest-akka.actor.default-dispatcher-2] INFO akka.actor.CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason] > Task :test |
효율적인 자원 활용:
확장성:
신뢰성 및 내결함성:
테스트 용이성:
ManualTime을 사용하면 테스트에서 시간 종속적인 동작을 정확하게 제어하여 신뢰할 수 있는 테스트를 수행할 수 있습니다.ActorTestKit과 TestProbe를 사용하여 격리된 테스트 환경을 제공함으로써 테스트가 결정론적이고 부작용이 없도록 합니다.유연성:
응답성:
BulkTaskCompleted 메시지를 기대하고 확인함으로써 작업 완료 시 시스템이 피드백을 제공하는지 확인합니다.BulkProcessorTest 클래스는 다양한 조건에서 BulkProcessor 액터가 대량의 데이터 처리를 올바르게 처리하는지 효과적으로 검증합니다. 수동 및 자동 플러시 메커니즘을 모두 테스트함으로써 액터가 다양한 운영 시나리오에서 올바르게 동작하는지 확인합니다.
핵심 요점:
이러한 측면을 철저히 테스트함으로써 개발자는 BulkProcessor 액터의 견고성에 자신감을 갖고 프로덕션 환경에 도입할 수 있습니다.
이러한 컨셉은 액터를 지원하는 다양한 언어를 통해 설계하고 활용할수 있습니다. |
NEXT : 액터를 계층형으로 구성하고 자식노드를 감시하는 SuperVisor를 만들어보기