인메모리로 작동하는 액터모델에 DB를 연결해 영속성을 부여하기 위해서는 AKKA(Pekko)가 지원하는 영속성 플러그인을 활용해도되지만

ORM보다 Mybatis를 더 선호하기 때문에 R2DBC - Repository 패턴을 먼저 구현한후 액터모델에 영속성을 구현하는 방법을 알아보겠습니다.

 

먼저 레파지토리 패턴을 알아보고 밸류 객체를 다루는 단순한 저장소를 구현해보자

1. 레파지토리 패턴 (Repository Pattern)

Domain-Driven Design(DDD)에서 레파지토리는 도메인 객체를 저장소로부터 검색하거나 저장하는 책임을 가지는 객체입니다.

  • 역할: 도메인 계층과 데이터 액세스 계층 간의 중재자 역할을 합니다.
  • 목적: 도메인 계층이 데이터 저장소(MySQL, MongoDB 등)의 세부 사항을 알 필요 없도록 추상화합니다.
  • :
    • UserRepository: findById(id: UserId): User 또는 save(user: User): Unit 같은 메서드 제공.

장점

  • 데이터 액세스 로직이 응용 서비스나 도메인 로직에 노출되지 않음.
  • 데이터베이스 변경 시에도 도메인 계층은 영향을 받지 않음.
  • 테스트 용이: Mock 레파지토리를 이용한 단위 테스트 가능.

2. 엔티티(Entity)와 밸류 객체(Value Object)의 차이

엔티티(Entity)

  • 정의: 고유 식별자(예: ID)를 가지고, 상태와 생명 주기를 관리하는 객체.
  • 특징:
    • 동일한 ID를 가지면 동일한 객체로 간주.
    • 상태가 변경될 수 있음(Mutable).
    • 도메인에서 중요한 비즈니스 규칙과 로직을 포함.

밸류 객체(Value Object)

  • 정의: 고유 식별자가 없고, 속성 값 자체로 객체를 구분.
  • 특징:
    • Immutable(불변): 생성 후 상태 변경 불가.
    • 값이 같으면 동일한 객체로 간주(값 기반 비교).
    • 비즈니스 로직이 거의 없고 속성을 단순히 표현하는 데 사용.

 

사전지식 링크 : 다음은 R2DBC Repository 패턴으로 JavaObject Value를 다루는 Durable Repository 구현체입니다.

 

액터모델을 통한 CQRS 성능이점

 

액터모델에서 Read분리는 Inmemory에서 수행되기때문에 결정적으로 Redis로 Read를 분리한 모델보다

성능이 수십배~수백배 빨라질수 있습니다. 이것은 Redis의 성능적 이슈가 아니라 Redis를 사용하기위해서

네트워크 IO를 추가로 사용해야한다란 결정적 단일지점 포인트가 있기때문입니다.

이 부분에대한 자세한 아티컬은 다음을 통해서 자세히 알수 있습니다.

참고링크 : https://petabridge.com/blog/begin-learning-actor-based-design/

 

액터모델을 통해 저장소를 연결해 기능을 구현한다는것은 Read는 대부분 인메모리에의해서 처리된다란것을 의미하며

이러한 액터모델 방식은 CRUD가 아닌 CQRS 패턴을 자연스럽게 활용해야한다란점에서 학습곡선이 있는것이며

액터모델은 CQRS를 단순하게 이용할수 있는 다양한 장치들을 제공해줍니다.

 

CQRS(Command Query Responsibility Segregation)은 **명령(Command)**과 **조회(Query)**의 책임을 분리하는 소프트웨어 설계 패턴입니다.

  • Command: 데이터 상태를 변경하는 작업(쓰기). 예: 생성, 업데이트, 삭제.
  • Query: 데이터를 읽는 작업(읽기). 예: 조회, 검색.

주요 특징:

  1. 책임 분리: 읽기와 쓰기를 별도의 모델로 처리하여 각 역할에 최적화된 설계를 가능하게 합니다.
  2. 스케일링 용이: 읽기와 쓰기가 독립적이므로, 각 요구 사항에 맞게 개별적으로 확장 가능합니다.
  3. 복잡도 증가: 두 개의 모델(쓰기 모델과 읽기 모델)을 유지해야 하므로 시스템 설계와 구현이 복잡해질 수 있습니다.

장점:

  • 성능 최적화: 읽기와 쓰기를 분리함으로써 더 빠르고 효율적인 처리 가능.
  • 확장성: 읽기와 쓰기의 독립적인 스케일링 가능.
  • 유연성: 쓰기 모델은 도메인 중심으로, 읽기 모델은 클라이언트의 요구에 맞춤화 가능.

단점:

  • 구현 복잡성: 두 모델 간 동기화를 관리해야 하며, 이벤트 소싱과 결합 시 복잡성이 증가.
  • 추가 인프라 필요: 상태 동기화를 위해 메시지 브로커 또는 이벤트 버스 등이 필요할 수 있음.

CQRS는 이벤트 소싱(Event Sourcing)과 자주 결합되며, 읽기와 쓰기 간 동기화를 이벤트 기반으로 처리하는 경우가 많습니다. 하지만 반드시 이벤트 소싱이 필요한 것은 아닙니다.

어떤 시스템에서 CQRS를 사용할지 결정하려면, 성능, 확장성, 복잡성 간의 균형을 잘 고려해야 합니다.

 

여기서는 Repository를 저수준으로 액터모델과 동기화화 하는 방법( HelloStateStoreActor) 과 , 액터모델이 지원하 StashBuffer (HelloStashActor)을 이용하는방법 두가지 구현 방식을 알아보겠습니다.

다양한 방식을 이용할수 있다란 점은 저장소를 다루는 다양한 방식을 액터모델에 적용할수 있음을 의미합니다.

 

class HelloStateStoreActor private constructor(
    private val context: ActorContext<HelloStateStoreActorCommand>,
    private val persistenceId : String,
    private val durableRepository: DurableRepository,
) : AbstractBehavior<HelloStateStoreActorCommand>(context) {

    companion object {
        fun create(persistenceId: String, durableRepository: DurableRepository): Behavior<HelloStateStoreActorCommand> {
            return Behaviors.setup { context -> HelloStateStoreActor(context, persistenceId, durableRepository) }
        }
    }

    private lateinit var helloStoreState: HelloStoreState

    init {
        context.log.info("Create HelloStateStoreActor - $persistenceId")
        initStoreState()
    }

    override fun createReceive(): Receive<HelloStateStoreActorCommand> {
        return newReceiveBuilder()
            .onMessage(HelloStore::class.java, this::onHello)
            .onMessage(HelloLimitStore::class.java, this::onHelloLimit)
            .onMessage(GetHelloCountStore::class.java, this::onGetHelloCount)
            .onMessage(GetHelloTotalCountStore::class.java, this::onGetHelloTotalCount)
            .onMessage(ChangeStateStore::class.java, this::onChangeState)
            .onMessage(ResetHelloCountStore::class.java, this::onResetHelloCount)
            .onSignal(PreRestart::class.java, this::onPreRestart)
            .build()
    }

    private val materializer = Materializer.createMaterializer(context.system)

    private val helloLimitSource = Source.queue<HelloLimitStore>(100, OverflowStrategy.backpressure())
        .throttle(3, Duration.ofSeconds(1))
        .to(Sink.foreach { cmd ->
            context.self.tell(HelloStore(cmd.message, cmd.replyTo))
        })
        .run(materializer)

    private fun onPreRestart(signal: PreRestart): Behavior<HelloStateStoreActorCommand> {
        context.log.info("HelloStateStoreActor actor restart: ")
        val parentPath = context.self.path().parent()
        val parentRef: ActorRef<SupervisorCommand> = ActorRefResolver.get(context.system).resolveActorRef(parentPath.toString())
        parentRef.tell(RestartChild(context.self.path().name(), context.self))

        return this
    }

    private fun onHello(command: HelloStore): Behavior<HelloStateStoreActorCommand> {

        if(command.message == "Crash") {
            throw RuntimeException("Crash")
        }

        when (helloStoreState.happyState) {
            HappyState.HAPPY -> {
                if (command.message == "Hello") {
                    helloStoreState.helloCount++
                    helloStoreState.helloTotalCount++

                    persist(helloStoreState, true)

                    command.replyTo.tell(HelloResponseStore("Kotlin"))
                    context.log.info("onHello-Kotlin - helloCount: ${helloStoreState.helloCount}")
                }
            }
            HappyState.ANGRY -> {
                command.replyTo.tell(HelloResponseStore("Don't talk to me!"))
            }
        }
        return this
    }

    private fun onHelloLimit(command: HelloLimitStore): Behavior<HelloStateStoreActorCommand> {
        helloLimitSource.offer(command).thenAccept { result ->
            when (result) {
                is QueueOfferResult.`Enqueued$` -> context.log.info("Command enqueued successfully")
                is QueueOfferResult.`Dropped$` -> context.log.error("Command dropped")
                is QueueOfferResult.Failure -> context.log.error("Failed to enqueue command", result.cause())
                is QueueOfferResult.`QueueClosed$` -> context.log.error("Queue was closed")
            }
        }
        return this
    }

    private fun onGetHelloCount(command: GetHelloCountStore): Behavior<HelloStateStoreActorCommand> {
        command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloCount))
        context.log.info("onGetHelloCount-helloCount: ${helloStoreState.helloCount}")
        return this
    }

    private fun onGetHelloTotalCount(command: GetHelloTotalCountStore): Behavior<HelloStateStoreActorCommand> {
        command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloTotalCount))
        return this
    }

    private fun onChangeState(command: ChangeStateStore): Behavior<HelloStateStoreActorCommand> {
        helloStoreState.happyState = command.newHappy
        persist(helloStoreState, true)
        return this
    }

    private fun onResetHelloCount(command: ResetHelloCountStore): Behavior<HelloStateStoreActorCommand> {
        context.log.info("Reset hello count")
        helloStoreState.helloCount = 0
        persist(helloStoreState, true)
        return this
    }

    private fun persist(newState: HelloStoreState, isAsync: Boolean = false) {
        if(isAsync) {
            persistAsync(newState)
        }
        else {
            runBlocking {
                durableRepository.createOrUpdateDurableStateEx(persistenceId, 1L, newState).awaitSingle()
            }
        }
    }

    private fun persistAsync(newState: HelloStoreState) {
        Source.single(newState)
            .mapAsync(1) { state ->
                durableRepository.createOrUpdateDurableStateEx(persistenceId, 1L, state).toFuture()
            }
            .runWith(Sink.ignore(), materializer)
    }

    private fun initStoreState() {
        runBlocking {
            var isNew: Boolean = false
            var result = durableRepository.findByIdEx<HelloStoreState>(persistenceId, 1L).awaitFirstOrNull()
            if(result != null) {
                helloStoreState = result
            }
            else{
                isNew = true
                helloStoreState = HelloStoreState(HappyState.HAPPY, 0, 0)
            }

            context.log.info("Init HelloStoreState - isNew: $isNew, $helloStoreState")

        }
    }
}

HelloStateStoreActor는 상태 관리를 위한 강력한 기능을 제공하며, Durable Repository를 활용하여 상태를 영속적으로 유지하는 아키텍처를 구현한 예입니다. 이 코드는 Pekko Actor를 활용하여 비동기적이고 확장 가능한 상태 관리와 상태 영속화를 통합하는 점에서 특징적입니다.

분석

주요 기능

  1. Durable Repository 통합

    • DurableRepositoryDurableState를 활용해 데이터베이스에 상태를 영속적으로 저장합니다​DurableRepositoryDurableStateDAOImplDurableStateDAODurableState.
    • 상태가 변경될 때마다 비동기 방식 또는 동기 방식으로 persist 메서드를 통해 저장합니다.
  2. 초기화 로직

    • initStoreState 메서드를 통해 Actor가 시작될 때 데이터베이스에서 기존 상태를 로드합니다.
    • 상태가 없을 경우 기본 상태를 생성합니다.
  3. 비동기 작업 및 제한

    • helloLimitSource는 메시지 처리량을 제한하며, 초당 3개의 메시지를 처리하도록 스로틀링(Throttling)을 구현했습니다.
  4. 상태 전환 관리

    • ChangeStateStore 메시지를 통해 상태 전환이 가능하며, 이 때 새로운 상태를 영속적으로 저장합니다.
  5. 재시작 로직

    • PreRestart 시그널을 처리하여 재시작 시 부모 Supervisor에게 재등록을 요청합니다.

설계 이점

  • 강력한 상태 유지: 데이터베이스를 사용하여 시스템 장애 시에도 상태를 복구할 수 있습니다.
  • 확장성: Pekko Actor의 분산 처리 능력을 활용하여 고부하 환경에서도 안정적으로 동작합니다.
  • 비동기 메시지 처리: 메시지 큐를 활용해 메시지 처리량 제한 및 순서 보장이 가능합니다.

성능적 이점

  1. 비동기 처리와 큐잉

    • 스로틀링과 비동기 처리는 시스템 과부하를 방지하고, 적절한 리소스 사용률을 유지합니다.
    • 특히 Source.queue를 활용한 메시지 처리 제한은 폭주 방지 및 메시지 손실 방지에 유리합니다.
  2. 지연 최소화

    • 동기/비동기 방식의 선택적 사용으로 적시에 상태를 업데이트하며, 저장 시간 최적화를 실현합니다.
  3. 자동화된 상태 관리

    • 주기적 상태 초기화(ResetHelloCountStore)와 비동기 작업은 관리 부담을 줄이고 응답 시간을 최소화합니다.

인프라 감소 효과

  1. 단일 책임 원칙 준수

    • 데이터베이스 중심 상태 관리를 통해 복잡한 애플리케이션 로직을 단순화하고, 관리해야 할 시스템 컴포넌트를 줄일 수 있습니다.
  2. 데이터베이스 활용 최적화

    • RDBMS를 활용한 Durable Repository는 상태 유지에 있어 추가적인 캐시 또는 외부 상태 관리 도구의 필요성을 제거합니다.
  3. 확장성 높은 설계

    • Pekko의 클러스터링 기능과 Durable Repository를 결합하면 별도의 상태 관리 시스템 없이 대규모 분산 환경에서 안정성을 유지할 수 있습니다.

적용 결과

  • 고가용성: 장애가 발생하더라도 상태 복구 가능.
  • 운영 효율성: 추가적인 상태 관리 인프라 제거로 리소스 절약.
  • 확장성 확보: 클러스터링 및 메시지 제한으로 대규모 워크로드를 처리 가능.

 

 

sealed class HelloStashCommand

//Stash
data class InitialState(val happyState: HappyStashState, val helloCount: Int, val helloTotalCount: Int) : HelloStashCommand()
data class DbError(val error:RuntimeException) : HelloStashCommand()
data class SavaState(val state: HelloStashState, val replyTo:ActorRef<Done>) : HelloStashCommand()
data class GetState(val replyTo:ActorRef<HelloStashState>) : HelloStashCommand()
data object SaveSuccess : HelloStashCommand()



class HelloStashActor private constructor(
    private val context: ActorContext<HelloStashCommand>,
    private val persistenceId : String,
    private val durableRepository: DurableRepository,
    private val buffer: StashBuffer<HelloStashCommand>
) {

    companion object {
        fun create(persistenceId: String, durableRepository: DurableRepository,
                   buffer: StashBuffer<HelloStashCommand>): Behavior<HelloStashCommand> {

            return Behaviors.withStash(100, {
                    Behaviors.setup {
                        context ->
                            context.pipeToSelf(
                                durableRepository.findByIdEx<HelloStashState>(persistenceId, 1L).toFuture(),
                                { value, cause ->
                                    if (cause == null) {
                                        if(value == null) {
                                            // First State
                                            InitialState(HappyStashState.HAPPY, 0, 0)
                                        } else {
                                            InitialState(value.happyState, value.helloCount, value.helloTotalCount)
                                        }
                                    } else {
                                        DbError(RuntimeException(cause))
                                    }
                                })

                            HelloStashActor(context, persistenceId, durableRepository, buffer).start()
                    }
                })
        }
    }

    //private lateinit var buffer: StashBuffer<HelloStashCommand>

    init {
        context.log.info("Create HelloStashStoreActor")

    }

    private fun start() : Behavior<HelloStashCommand> {
        return Behaviors.receive(HelloStashCommand::class.java)
            .onMessage(InitialState::class.java, this::onInitialState)
            .onMessage(DbError::class.java, this::onDBError)
            .onMessage(HelloStashCommand::class.java, this::stashOtherCommand)
            .build()
    }

    private fun onInitialState(command: InitialState): Behavior<HelloStashCommand> {
        var state:HelloStashState = HelloStashState(command.happyState, command.helloCount, command.helloTotalCount)
        return buffer.unstashAll(active(state));
    }

    private fun onDBError(command: DbError): Behavior<HelloStashCommand> {
        throw command.error;
    }

    private fun stashOtherCommand(command: HelloStashCommand): Behavior<HelloStashCommand> {
        buffer.stash(command)
        return Behaviors.same()
    }

    private fun active(state:HelloStashState) : Behavior<HelloStashCommand>{
        return Behaviors.receive(HelloStashCommand::class.java)
            .onMessage(GetState::class.java, {message -> onGetState(state, message)})
            .onMessage(SavaState::class.java, this::onSaveState)
            .build()
    }

    private fun onGetState(state:HelloStashState, message: GetState): Behavior<HelloStashCommand> {
        message.replyTo.tell(state)
        return Behaviors.same()
    }

    private fun onSaveState( message: SavaState): Behavior<HelloStashCommand> {
        context.pipeToSelf(
            durableRepository.createOrUpdateDurableStateEx<HelloStashState>(persistenceId, 1L, message.state).toFuture(),
            { value, cause ->
                if (cause == null) {
                    message.replyTo.tell(Done.getInstance())
                    SaveSuccess
                } else {
                    DbError(RuntimeException(cause))
                }
            })
        return saving(message.state, message.replyTo)
    }


    private fun saving(state:HelloStashState, replyTo:ActorRef<Done>) : Behavior<HelloStashCommand> {
        return Behaviors.receive(HelloStashCommand::class.java)
            .onMessage(SaveSuccess::class.java, {message -> onSaveSucess(state, replyTo)})
            .onMessage(DbError::class.java, this::onDBError)
            .onMessage(HelloStashCommand::class.java, this::stashOtherCommand)
            .build()
    }

    private fun onSaveSucess(state:HelloStashState, replyTo: ActorRef<Done>): Behavior<HelloStashCommand> {
        replyTo.tell(Done.getInstance())
        return buffer.unstashAll(active(state));
    }
}

 

HelloStashActor는 Pekko의 StashBuffer를 활용하여 메시지를 임시 저장하고, 상태 초기화 및 복구 과정을 안정적으로 처리하도록 설계되었습니다. 이 방식은 메시지가 수신될 때 상태가 초기화되지 않은 경우에도 유실되지 않도록 설계된 것이 핵심입니다.

분석

주요 기능

  1. StashBuffer를 통한 메시지 유실 방지

    • 초기화 중에 수신된 메시지를 버퍼에 저장하고, 초기화가 완료되면 unstashAll을 통해 처리합니다.
    • 이는 초기화 로직(durableRepository에서 상태를 읽어오는 작업)이 비동기로 동작하기 때문에 발생할 수 있는 메시지 손실을 방지합니다.
  2. 초기화 단계

    • Actor 생성 시 durableRepository.findByIdEx를 호출하여 데이터베이스에서 초기 상태를 로드합니다.
    • 상태가 존재하지 않을 경우 기본 상태(HappyStashState.HAPPY, helloCount = 0, helloTotalCount = 0)를 생성합니다.
  3. 비동기 상태 저장

    • SavaState 명령으로 상태를 저장하며, 이 작업은 비동기로 수행됩니다.
    • 저장 작업 완료 후 메시지를 처리하며, 저장 도중 들어오는 다른 메시지는 StashBuffer에 보관됩니다.
  4. 상태 관리 및 전환

    • active 상태로 전환 후에는 HelloStashState를 직접 관리하며, 요청된 상태 조회(GetState)나 상태 저장(SavaState)을 수행합니다.
  5. 에러 핸들링

    • 데이터베이스 접근 중 발생하는 에러(DbError)를 처리하여, 안정성을 높입니다.

설계 이점

안정성

  • 메시지 유실 방지: StashBuffer는 초기화 과정에서 발생할 수 있는 메시지 유실을 방지합니다. 초기화 완료 후, 버퍼에 저장된 모든 메시지를 처리하므로 시스템의 안정성이 높아집니다.
  • 에러 처리: 데이터베이스 에러 발생 시 즉각 처리하도록 설계되어 비정상적인 상태에서의 작동을 최소화합니다.

상태 초기화

  • 데이터베이스에서 상태를 가져와 초기화하므로 상태 관리의 일관성이 유지됩니다.
  • 상태가 없을 경우 기본 값을 설정하므로 예상치 못한 에러 상황을 방지할 수 있습니다.

비동기 저장

  • 상태 저장은 비동기로 수행되어 작업 처리 중에도 Actor가 다른 메시지를 처리할 수 있는 유연성을 제공합니다.

성능적 이점

  1. 효율적인 메시지 처리

    • 초기화 및 상태 저장 중 수신되는 메시지를 StashBuffer에 보관하여 대기 시간을 줄입니다.
    • 이는 Actor가 상태 저장과 메시지 처리를 병렬적으로 수행할 수 있게 해줍니다.
  2. 병렬성 강화

    • 데이터베이스 접근과 메시지 처리가 병렬적으로 수행됨으로써 시스템의 응답성이 향상됩니다.
  3. 에러로 인한 리소스 낭비 최소화

    • 에러 발생 시 적절히 처리하므로 불필요한 리소스 소모를 줄이고 안정적인 작동을 보장합니다.

인프라 감소 효과

  1. 상태 관리 간소화

    • Durable Repository를 통해 데이터베이스에서 상태를 직접 관리하므로 외부 상태 관리 도구를 제거할 수 있습니다.
  2. 단일 책임 원칙 준수

    • 메시지 처리와 상태 저장이 명확히 분리되어 유지보수가 용이하며, 관리해야 할 인프라 요소가 줄어듭니다.
  3. 고가용성 아키텍처 지원

    • StashBuffer를 활용한 메시지 저장 및 상태 복구 기능은 클러스터 환경에서의 안정성을 높이며, 복잡한 장애 복구 메커니즘 없이도 효율적인 상태 관리가 가능합니다.

운영적 이점

  • 무중단 상태 초기화: Actor가 비동기로 상태를 초기화하는 동안에도 시스템이 지속적으로 메시지를 처리할 수 있습니다.
  • 확장성: 클러스터링 환경에서 메시지와 상태 관리를 독립적으로 수행하므로 높은 부하에서도 안정적으로 작동합니다.
  • 관리 간소화: 추가적인 메시지 큐나 상태 저장소를 통합할 필요 없이 Actor 자체에서 상태 관리와 메시지 큐잉을 지원합니다.

결론

HelloStashActor는 메시지 유실 방지와 비동기 상태 관리를 결합한 구조로, 안정성과 효율성을 겸비한 아키텍처입니다. 고성능이 요구되는 대규모 분산 시스템에서 특히 유용하며, 복잡한 상태 관리 로직을 단순화하면서도 강력한 기능을 제공합니다.

 

코틀린버전으로 작성된 구현코드와 유닛테스트 코드는 다음을 통해 살펴볼수 있습니다.

 

 

  • No labels