인메모리로 작동하는 액터모델에 DB를 연결해 영속성을 부여하기 위해서는 AKKA(Pekko)가 지원하는 영속성 플러그인을 활용해도되지만
ORM보다 Mybatis를 더 선호하기 때문에 R2DBC - Repository 패턴을 먼저 구현한후 액터모델에 영속성을 구현하는 방법을 알아보겠습니다.
먼저 레파지토리 패턴을 알아보고 밸류 객체를 다루는 단순한 저장소를 구현해보자
Domain-Driven Design(DDD)에서 레파지토리는 도메인 객체를 저장소로부터 검색하거나 저장하는 책임을 가지는 객체입니다.
UserRepository
: findById(id: UserId): User
또는 save(user: User): Unit
같은 메서드 제공.
사전지식 링크 : 다음은 R2DBC Repository 패턴으로 JavaObject Value를 다루는 Durable Repository 구현체입니다.
액터모델에서 Read분리는 Inmemory에서 수행되기때문에 결정적으로 Redis로 Read를 분리한 모델보다
성능이 수십배~수백배 빨라질수 있습니다. 이것은 Redis의 성능적 이슈가 아니라 Redis를 사용하기위해서
네트워크 IO를 추가로 사용해야한다란 결정적 단일지점 포인트가 있기때문입니다.
이 부분에대한 자세한 아티컬은 다음을 통해서 자세히 알수 있습니다.
참고링크 : https://petabridge.com/blog/begin-learning-actor-based-design/
액터모델을 통해 저장소를 연결해 기능을 구현한다는것은 Read는 대부분 인메모리에의해서 처리된다란것을 의미하며
이러한 액터모델 방식은 CRUD가 아닌 CQRS 패턴을 자연스럽게 활용해야한다란점에서 학습곡선이 있는것이며
액터모델은 CQRS를 단순하게 이용할수 있는 다양한 장치들을 제공해줍니다.
CQRS(Command Query Responsibility Segregation)은 **명령(Command)**과 **조회(Query)**의 책임을 분리하는 소프트웨어 설계 패턴입니다.
CQRS는 이벤트 소싱(Event Sourcing)과 자주 결합되며, 읽기와 쓰기 간 동기화를 이벤트 기반으로 처리하는 경우가 많습니다. 하지만 반드시 이벤트 소싱이 필요한 것은 아닙니다.
어떤 시스템에서 CQRS를 사용할지 결정하려면, 성능, 확장성, 복잡성 간의 균형을 잘 고려해야 합니다.
여기서는 Repository를 저수준으로 액터모델과 동기화화 하는 방법( HelloStateStoreActor) 과 , 액터모델이 지원하 StashBuffer (HelloStashActor)을 이용하는방법 두가지 구현 방식을 알아보겠습니다.
다양한 방식을 이용할수 있다란 점은 저장소를 다루는 다양한 방식을 액터모델에 적용할수 있음을 의미합니다.
class HelloStateStoreActor private constructor( private val context: ActorContext<HelloStateStoreActorCommand>, private val persistenceId : String, private val durableRepository: DurableRepository, ) : AbstractBehavior<HelloStateStoreActorCommand>(context) { companion object { fun create(persistenceId: String, durableRepository: DurableRepository): Behavior<HelloStateStoreActorCommand> { return Behaviors.setup { context -> HelloStateStoreActor(context, persistenceId, durableRepository) } } } private lateinit var helloStoreState: HelloStoreState init { context.log.info("Create HelloStateStoreActor - $persistenceId") initStoreState() } override fun createReceive(): Receive<HelloStateStoreActorCommand> { return newReceiveBuilder() .onMessage(HelloStore::class.java, this::onHello) .onMessage(HelloLimitStore::class.java, this::onHelloLimit) .onMessage(GetHelloCountStore::class.java, this::onGetHelloCount) .onMessage(GetHelloTotalCountStore::class.java, this::onGetHelloTotalCount) .onMessage(ChangeStateStore::class.java, this::onChangeState) .onMessage(ResetHelloCountStore::class.java, this::onResetHelloCount) .onSignal(PreRestart::class.java, this::onPreRestart) .build() } private val materializer = Materializer.createMaterializer(context.system) private val helloLimitSource = Source.queue<HelloLimitStore>(100, OverflowStrategy.backpressure()) .throttle(3, Duration.ofSeconds(1)) .to(Sink.foreach { cmd -> context.self.tell(HelloStore(cmd.message, cmd.replyTo)) }) .run(materializer) private fun onPreRestart(signal: PreRestart): Behavior<HelloStateStoreActorCommand> { context.log.info("HelloStateStoreActor actor restart: ") val parentPath = context.self.path().parent() val parentRef: ActorRef<SupervisorCommand> = ActorRefResolver.get(context.system).resolveActorRef(parentPath.toString()) parentRef.tell(RestartChild(context.self.path().name(), context.self)) return this } private fun onHello(command: HelloStore): Behavior<HelloStateStoreActorCommand> { if(command.message == "Crash") { throw RuntimeException("Crash") } when (helloStoreState.happyState) { HappyState.HAPPY -> { if (command.message == "Hello") { helloStoreState.helloCount++ helloStoreState.helloTotalCount++ persist(helloStoreState, true) command.replyTo.tell(HelloResponseStore("Kotlin")) context.log.info("onHello-Kotlin - helloCount: ${helloStoreState.helloCount}") } } HappyState.ANGRY -> { command.replyTo.tell(HelloResponseStore("Don't talk to me!")) } } return this } private fun onHelloLimit(command: HelloLimitStore): Behavior<HelloStateStoreActorCommand> { helloLimitSource.offer(command).thenAccept { result -> when (result) { is QueueOfferResult.`Enqueued$` -> context.log.info("Command enqueued successfully") is QueueOfferResult.`Dropped$` -> context.log.error("Command dropped") is QueueOfferResult.Failure -> context.log.error("Failed to enqueue command", result.cause()) is QueueOfferResult.`QueueClosed$` -> context.log.error("Queue was closed") } } return this } private fun onGetHelloCount(command: GetHelloCountStore): Behavior<HelloStateStoreActorCommand> { command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloCount)) context.log.info("onGetHelloCount-helloCount: ${helloStoreState.helloCount}") return this } private fun onGetHelloTotalCount(command: GetHelloTotalCountStore): Behavior<HelloStateStoreActorCommand> { command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloTotalCount)) return this } private fun onChangeState(command: ChangeStateStore): Behavior<HelloStateStoreActorCommand> { helloStoreState.happyState = command.newHappy persist(helloStoreState, true) return this } private fun onResetHelloCount(command: ResetHelloCountStore): Behavior<HelloStateStoreActorCommand> { context.log.info("Reset hello count") helloStoreState.helloCount = 0 persist(helloStoreState, true) return this } private fun persist(newState: HelloStoreState, isAsync: Boolean = false) { if(isAsync) { persistAsync(newState) } else { runBlocking { durableRepository.createOrUpdateDurableStateEx(persistenceId, 1L, newState).awaitSingle() } } } private fun persistAsync(newState: HelloStoreState) { Source.single(newState) .mapAsync(1) { state -> durableRepository.createOrUpdateDurableStateEx(persistenceId, 1L, state).toFuture() } .runWith(Sink.ignore(), materializer) } private fun initStoreState() { runBlocking { var isNew: Boolean = false var result = durableRepository.findByIdEx<HelloStoreState>(persistenceId, 1L).awaitFirstOrNull() if(result != null) { helloStoreState = result } else{ isNew = true helloStoreState = HelloStoreState(HappyState.HAPPY, 0, 0) } context.log.info("Init HelloStoreState - isNew: $isNew, $helloStoreState") } } } |
HelloStateStoreActor
는 상태 관리를 위한 강력한 기능을 제공하며, Durable Repository를 활용하여 상태를 영속적으로 유지하는 아키텍처를 구현한 예입니다. 이 코드는 Pekko Actor를 활용하여 비동기적이고 확장 가능한 상태 관리와 상태 영속화를 통합하는 점에서 특징적입니다.
Durable Repository 통합
DurableRepository
와 DurableState
를 활용해 데이터베이스에 상태를 영속적으로 저장합니다DurableRepositoryDurableStateDAOImplDurableStateDAODurableState.persist
메서드를 통해 저장합니다.초기화 로직
initStoreState
메서드를 통해 Actor가 시작될 때 데이터베이스에서 기존 상태를 로드합니다.비동기 작업 및 제한
helloLimitSource
는 메시지 처리량을 제한하며, 초당 3개의 메시지를 처리하도록 스로틀링(Throttling)을 구현했습니다.상태 전환 관리
ChangeStateStore
메시지를 통해 상태 전환이 가능하며, 이 때 새로운 상태를 영속적으로 저장합니다.재시작 로직
PreRestart
시그널을 처리하여 재시작 시 부모 Supervisor에게 재등록을 요청합니다.비동기 처리와 큐잉
Source.queue
를 활용한 메시지 처리 제한은 폭주 방지 및 메시지 손실 방지에 유리합니다.지연 최소화
자동화된 상태 관리
ResetHelloCountStore
)와 비동기 작업은 관리 부담을 줄이고 응답 시간을 최소화합니다.단일 책임 원칙 준수
데이터베이스 활용 최적화
확장성 높은 설계
sealed class HelloStashCommand //Stash data class InitialState(val happyState: HappyStashState, val helloCount: Int, val helloTotalCount: Int) : HelloStashCommand() data class DbError(val error:RuntimeException) : HelloStashCommand() data class SavaState(val state: HelloStashState, val replyTo:ActorRef<Done>) : HelloStashCommand() data class GetState(val replyTo:ActorRef<HelloStashState>) : HelloStashCommand() data object SaveSuccess : HelloStashCommand() class HelloStashActor private constructor( private val context: ActorContext<HelloStashCommand>, private val persistenceId : String, private val durableRepository: DurableRepository, private val buffer: StashBuffer<HelloStashCommand> ) { companion object { fun create(persistenceId: String, durableRepository: DurableRepository, buffer: StashBuffer<HelloStashCommand>): Behavior<HelloStashCommand> { return Behaviors.withStash(100, { Behaviors.setup { context -> context.pipeToSelf( durableRepository.findByIdEx<HelloStashState>(persistenceId, 1L).toFuture(), { value, cause -> if (cause == null) { if(value == null) { // First State InitialState(HappyStashState.HAPPY, 0, 0) } else { InitialState(value.happyState, value.helloCount, value.helloTotalCount) } } else { DbError(RuntimeException(cause)) } }) HelloStashActor(context, persistenceId, durableRepository, buffer).start() } }) } } //private lateinit var buffer: StashBuffer<HelloStashCommand> init { context.log.info("Create HelloStashStoreActor") } private fun start() : Behavior<HelloStashCommand> { return Behaviors.receive(HelloStashCommand::class.java) .onMessage(InitialState::class.java, this::onInitialState) .onMessage(DbError::class.java, this::onDBError) .onMessage(HelloStashCommand::class.java, this::stashOtherCommand) .build() } private fun onInitialState(command: InitialState): Behavior<HelloStashCommand> { var state:HelloStashState = HelloStashState(command.happyState, command.helloCount, command.helloTotalCount) return buffer.unstashAll(active(state)); } private fun onDBError(command: DbError): Behavior<HelloStashCommand> { throw command.error; } private fun stashOtherCommand(command: HelloStashCommand): Behavior<HelloStashCommand> { buffer.stash(command) return Behaviors.same() } private fun active(state:HelloStashState) : Behavior<HelloStashCommand>{ return Behaviors.receive(HelloStashCommand::class.java) .onMessage(GetState::class.java, {message -> onGetState(state, message)}) .onMessage(SavaState::class.java, this::onSaveState) .build() } private fun onGetState(state:HelloStashState, message: GetState): Behavior<HelloStashCommand> { message.replyTo.tell(state) return Behaviors.same() } private fun onSaveState( message: SavaState): Behavior<HelloStashCommand> { context.pipeToSelf( durableRepository.createOrUpdateDurableStateEx<HelloStashState>(persistenceId, 1L, message.state).toFuture(), { value, cause -> if (cause == null) { message.replyTo.tell(Done.getInstance()) SaveSuccess } else { DbError(RuntimeException(cause)) } }) return saving(message.state, message.replyTo) } private fun saving(state:HelloStashState, replyTo:ActorRef<Done>) : Behavior<HelloStashCommand> { return Behaviors.receive(HelloStashCommand::class.java) .onMessage(SaveSuccess::class.java, {message -> onSaveSucess(state, replyTo)}) .onMessage(DbError::class.java, this::onDBError) .onMessage(HelloStashCommand::class.java, this::stashOtherCommand) .build() } private fun onSaveSucess(state:HelloStashState, replyTo: ActorRef<Done>): Behavior<HelloStashCommand> { replyTo.tell(Done.getInstance()) return buffer.unstashAll(active(state)); } } |
HelloStashActor
는 Pekko의 StashBuffer
를 활용하여 메시지를 임시 저장하고, 상태 초기화 및 복구 과정을 안정적으로 처리하도록 설계되었습니다. 이 방식은 메시지가 수신될 때 상태가 초기화되지 않은 경우에도 유실되지 않도록 설계된 것이 핵심입니다.
StashBuffer를 통한 메시지 유실 방지
unstashAll
을 통해 처리합니다.durableRepository
에서 상태를 읽어오는 작업)이 비동기로 동작하기 때문에 발생할 수 있는 메시지 손실을 방지합니다.초기화 단계
durableRepository.findByIdEx
를 호출하여 데이터베이스에서 초기 상태를 로드합니다.HappyStashState.HAPPY
, helloCount = 0
, helloTotalCount = 0
)를 생성합니다.비동기 상태 저장
SavaState
명령으로 상태를 저장하며, 이 작업은 비동기로 수행됩니다.StashBuffer
에 보관됩니다.상태 관리 및 전환
active
상태로 전환 후에는 HelloStashState
를 직접 관리하며, 요청된 상태 조회(GetState
)나 상태 저장(SavaState
)을 수행합니다.에러 핸들링
DbError
)를 처리하여, 안정성을 높입니다.StashBuffer
는 초기화 과정에서 발생할 수 있는 메시지 유실을 방지합니다. 초기화 완료 후, 버퍼에 저장된 모든 메시지를 처리하므로 시스템의 안정성이 높아집니다.효율적인 메시지 처리
StashBuffer
에 보관하여 대기 시간을 줄입니다.병렬성 강화
에러로 인한 리소스 낭비 최소화
상태 관리 간소화
단일 책임 원칙 준수
고가용성 아키텍처 지원
StashBuffer
를 활용한 메시지 저장 및 상태 복구 기능은 클러스터 환경에서의 안정성을 높이며, 복잡한 장애 복구 메커니즘 없이도 효율적인 상태 관리가 가능합니다.HelloStashActor
는 메시지 유실 방지와 비동기 상태 관리를 결합한 구조로, 안정성과 효율성을 겸비한 아키텍처입니다. 고성능이 요구되는 대규모 분산 시스템에서 특히 유용하며, 복잡한 상태 관리 로직을 단순화하면서도 강력한 기능을 제공합니다.
코틀린버전으로 작성된 구현코드와 유닛테스트 코드는 다음을 통해 살펴볼수 있습니다.