Page History
인메모리로 작동하는 액터모델에 DB를 연결해 영속성을 부여하기 위해서는 AKKA(Pekko)가 지원하는 영속성 플러그인을 활용해도되지만
ORM보다 Mybatis를 더 선호하기 때문에 R2DBC - Repository 패턴을 먼저 구현한후 액터모델에 영속성을 구현하는 방법을 알아보겠습니다.
먼저 레파지토리 패턴을 알아보고 밸류 객첼르 객체를 다루는 단순한 저장소를 구현해보자
1. 레파지토리 패턴 (Repository Pattern)
...
- 정의: 고유 식별자가 없고, 속성 값 자체로 객체를 구분.
- 특징:
- Immutable(불변): 생성 후 상태 변경 불가.
- 값이 같으면 동일한 객체로 간주(값 기반 비교).
- 비즈니스 로직이 거의 없고 속성을 단순히 표현하는 데 사용.
사전지식 링크 : 다음은 R2DBC Repository 패턴으로 JavaObject Value를 다루는 Durable Repository 구현체입니다.
액터모델을 통한 CQRS 성능이점
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
액터모델에서 Read분리는 Inmemory에서 수행되기때문에 결정적으로 Redis로 Read를 분리한 모델보다
...
참고링크 : https://petabridge.com/blog/begin-learning-actor-based-design/
액터모델을 통해 저장소를 연결해 기능을 구현한다는것은 Read는 대부분 인메모리에의해서 처리된다란것을 의미하며
...
액터모델은 CQRS를 단순하게 이용할수 있는 다양한 장치들을 제공해줍니다.
CQRS(Command Query Responsibility Segregation)은 **명령(Command)**과 **조회(Query)**의 책임을 분리하는 소프트웨어 설계 패턴입니다.
...
어떤 시스템에서 CQRS를 사용할지 결정하려면, 성능, 확장성, 복잡성 간의 균형을 잘 고려해야 합니다.
여기서는 Repository를 저수준으로 액터모델과 동기화화 하는 방법( HelloStateStoreActor) 과 , 액터모델이 지원하 StashBuffer (HelloStashActor)을 이용하는방법 두가지 구현 방식을 알아보겠습니다.
다양한 방식을 이용할수 있다란 점은 저장소를 다루는 다양한 방식을 액터모델에 적용할수 있음을 의미합니다.
Code Block | ||
---|---|---|
| ||
class HelloStateStoreActor private constructor(
private val context: ActorContext<HelloStateStoreActorCommand>,
private val persistenceId : String,
private val durableRepository: DurableRepository,
) : AbstractBehavior<HelloStateStoreActorCommand>(context) {
companion object {
fun create(persistenceId: String, durableRepository: DurableRepository): Behavior<HelloStateStoreActorCommand> {
return Behaviors.setup { context -> HelloStateStoreActor(context, persistenceId, durableRepository) }
}
}
private lateinit var helloStoreState: HelloStoreState
init {
context.log.info("Create HelloStateStoreActor - $persistenceId")
initStoreState()
}
override fun createReceive(): Receive<HelloStateStoreActorCommand> {
return newReceiveBuilder()
.onMessage(HelloStore::class.java, this::onHello)
.onMessage(HelloLimitStore::class.java, this::onHelloLimit)
.onMessage(GetHelloCountStore::class.java, this::onGetHelloCount)
.onMessage(GetHelloTotalCountStore::class.java, this::onGetHelloTotalCount)
.onMessage(ChangeStateStore::class.java, this::onChangeState)
.onMessage(ResetHelloCountStore::class.java, this::onResetHelloCount)
.onSignal(PreRestart::class.java, this::onPreRestart)
.build()
}
private val materializer = Materializer.createMaterializer(context.system)
private val helloLimitSource = Source.queue<HelloLimitStore>(100, OverflowStrategy.backpressure())
.throttle(3, Duration.ofSeconds(1))
.to(Sink.foreach { cmd ->
context.self.tell(HelloStore(cmd.message, cmd.replyTo))
})
.run(materializer)
private fun onPreRestart(signal: PreRestart): Behavior<HelloStateStoreActorCommand> {
context.log.info("HelloStateStoreActor actor restart: ")
val parentPath = context.self.path().parent()
val parentRef: ActorRef<SupervisorCommand> = ActorRefResolver.get(context.system).resolveActorRef(parentPath.toString())
parentRef.tell(RestartChild(context.self.path().name(), context.self))
return this
}
private fun onHello(command: HelloStore): Behavior<HelloStateStoreActorCommand> {
if(command.message == "Crash") {
throw RuntimeException("Crash")
}
when (helloStoreState.happyState) {
HappyState.HAPPY -> {
if (command.message == "Hello") {
helloStoreState.helloCount++
helloStoreState.helloTotalCount++
persist(helloStoreState, true)
command.replyTo.tell(HelloResponseStore("Kotlin"))
context.log.info("onHello-Kotlin - helloCount: ${helloStoreState.helloCount}")
}
}
HappyState.ANGRY -> {
command.replyTo.tell(HelloResponseStore("Don't talk to me!"))
}
}
return this
}
private fun onHelloLimit(command: HelloLimitStore): Behavior<HelloStateStoreActorCommand> {
helloLimitSource.offer(command).thenAccept { result ->
when (result) {
is QueueOfferResult.`Enqueued$` -> context.log.info("Command enqueued successfully")
is QueueOfferResult.`Dropped$` -> context.log.error("Command dropped")
is QueueOfferResult.Failure -> context.log.error("Failed to enqueue command", result.cause())
is QueueOfferResult.`QueueClosed$` -> context.log.error("Queue was closed")
}
}
return this
}
private fun onGetHelloCount(command: GetHelloCountStore): Behavior<HelloStateStoreActorCommand> {
command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloCount))
context.log.info("onGetHelloCount-helloCount: ${helloStoreState.helloCount}")
return this
}
private fun onGetHelloTotalCount(command: GetHelloTotalCountStore): Behavior<HelloStateStoreActorCommand> {
command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloTotalCount))
return this
}
private fun onChangeState(command: ChangeStateStore): Behavior<HelloStateStoreActorCommand> {
helloStoreState.happyState = command.newHappy
persist(helloStoreState, true)
return this
}
private fun onResetHelloCount(command: ResetHelloCountStore): Behavior<HelloStateStoreActorCommand> {
context.log.info("Reset hello count")
helloStoreState.helloCount = 0
persist(helloStoreState, true)
return this
}
private fun persist(newState: HelloStoreState, isAsync: Boolean = false) {
if(isAsync) {
persistAsync(newState)
}
else {
runBlocking {
durableRepository.createOrUpdateDurableStateEx(persistenceId, 1L, newState).awaitSingle()
}
}
}
private fun persistAsync(newState: HelloStoreState) {
Source.single(newState)
.mapAsync(1) { state ->
durableRepository.createOrUpdateDurableStateEx(persistenceId, 1L, state).toFuture()
}
.runWith(Sink.ignore(), materializer)
}
private fun initStoreState() {
runBlocking {
var isNew: Boolean = false
var result = durableRepository.findByIdEx<HelloStoreState>(persistenceId, 1L).awaitFirstOrNull()
if(result != null) {
helloStoreState = result
}
else{
isNew = true
helloStoreState = HelloStoreState(HappyState.HAPPY, 0, 0)
}
context.log.info("Init HelloStoreState - isNew: $isNew, $helloStoreState")
}
}
} |
...
- 고가용성: 장애가 발생하더라도 상태 복구 가능.
- 운영 효율성: 추가적인 상태 관리 인프라 제거로 리소스 절약.
- 확장성 확보: 클러스터링 및 메시지 제한으로 대규모 워크로드를 처리 가능.
Code Block | ||
---|---|---|
| ||
sealed class HelloStashCommand
//Stash
data class InitialState(val happyState: HappyStashState, val helloCount: Int, val helloTotalCount: Int) : HelloStashCommand()
data class DbError(val error:RuntimeException) : HelloStashCommand()
data class SavaState(val state: HelloStashState, val replyTo:ActorRef<Done>) : HelloStashCommand()
data class GetState(val replyTo:ActorRef<HelloStashState>) : HelloStashCommand()
data object SaveSuccess : HelloStashCommand()
class HelloStashActor private constructor(
private val context: ActorContext<HelloStashCommand>,
private val persistenceId : String,
private val durableRepository: DurableRepository,
private val buffer: StashBuffer<HelloStashCommand>
) {
companion object {
fun create(persistenceId: String, durableRepository: DurableRepository,
buffer: StashBuffer<HelloStashCommand>): Behavior<HelloStashCommand> {
return Behaviors.withStash(100, {
Behaviors.setup {
context ->
context.pipeToSelf(
durableRepository.findByIdEx<HelloStashState>(persistenceId, 1L).toFuture(),
{ value, cause ->
if (cause == null) {
if(value == null) {
// First State
InitialState(HappyStashState.HAPPY, 0, 0)
} else {
InitialState(value.happyState, value.helloCount, value.helloTotalCount)
}
} else {
DbError(RuntimeException(cause))
}
})
HelloStashActor(context, persistenceId, durableRepository, buffer).start()
}
})
}
}
//private lateinit var buffer: StashBuffer<HelloStashCommand>
init {
context.log.info("Create HelloStashStoreActor")
}
private fun start() : Behavior<HelloStashCommand> {
return Behaviors.receive(HelloStashCommand::class.java)
.onMessage(InitialState::class.java, this::onInitialState)
.onMessage(DbError::class.java, this::onDBError)
.onMessage(HelloStashCommand::class.java, this::stashOtherCommand)
.build()
}
private fun onInitialState(command: InitialState): Behavior<HelloStashCommand> {
var state:HelloStashState = HelloStashState(command.happyState, command.helloCount, command.helloTotalCount)
return buffer.unstashAll(active(state));
}
private fun onDBError(command: DbError): Behavior<HelloStashCommand> {
throw command.error;
}
private fun stashOtherCommand(command: HelloStashCommand): Behavior<HelloStashCommand> {
buffer.stash(command)
return Behaviors.same()
}
private fun active(state:HelloStashState) : Behavior<HelloStashCommand>{
return Behaviors.receive(HelloStashCommand::class.java)
.onMessage(GetState::class.java, {message -> onGetState(state, message)})
.onMessage(SavaState::class.java, this::onSaveState)
.build()
}
private fun onGetState(state:HelloStashState, message: GetState): Behavior<HelloStashCommand> {
message.replyTo.tell(state)
return Behaviors.same()
}
private fun onSaveState( message: SavaState): Behavior<HelloStashCommand> {
context.pipeToSelf(
durableRepository.createOrUpdateDurableStateEx<HelloStashState>(persistenceId, 1L, message.state).toFuture(),
{ value, cause ->
if (cause == null) {
message.replyTo.tell(Done.getInstance())
SaveSuccess
} else {
DbError(RuntimeException(cause))
}
})
return saving(message.state, message.replyTo)
}
private fun saving(state:HelloStashState, replyTo:ActorRef<Done>) : Behavior<HelloStashCommand> {
return Behaviors.receive(HelloStashCommand::class.java)
.onMessage(SaveSuccess::class.java, {message -> onSaveSucess(state, replyTo)})
.onMessage(DbError::class.java, this::onDBError)
.onMessage(HelloStashCommand::class.java, this::stashOtherCommand)
.build()
}
private fun onSaveSucess(state:HelloStashState, replyTo: ActorRef<Done>): Behavior<HelloStashCommand> {
replyTo.tell(Done.getInstance())
return buffer.unstashAll(active(state));
}
} |
HelloStashActor
는 Pekko의 StashBuffer
를 활용하여 메시지를 임시 저장하고, 상태 초기화 및 복구 과정을 안정적으로 처리하도록 설계되었습니다. 이 방식은 메시지가 수신될 때 상태가 초기화되지 않은 경우에도 유실되지 않도록 설계된 것이 핵심입니다.
...
HelloStashActor
는 메시지 유실 방지와 비동기 상태 관리를 결합한 구조로, 안정성과 효율성을 겸비한 아키텍처입니다. 고성능이 요구되는 대규모 분산 시스템에서 특히 유용하며, 복잡한 상태 관리 로직을 단순화하면서도 강력한 기능을 제공합니다.
코틀린버전으로 작성된 구현코드와 유닛테스트 코드는 다음을 통해 살펴볼수 있습니다.
- 구현코드 : https://github.com/psmon/kopring-reactive-labs/tree/main/KotlinBootReactiveLabs/src/main/kotlin/org/example/kotlinbootreactivelabs/actor/state/store
- 테스트 코드 : https://github.com/psmon/kopring-reactive-labs/tree/main/KotlinBootReactiveLabs/src/test/kotlin/org/example/kotlinbootreactivelabs/actor/state/store