액터는 메시지를 실시간으로 받을수 있으며 그 메시지를 DB에 저장할수 있는 영속화(Persitent) 기능을 지원합니다.
이벤트를 먼저 설계한후 저장장치를 이후 고민할수 있는것이 액터모델의 특징이며
다음과같은 영속성장치를 스키마모델과 함께 제공합니다.
Journal
- 이벤트 소싱(Event Sourcing) 방식을 사용합니다
- 액터의 상태 변경을 나타내는 이벤트들을 순차적으로 저장합니다
- 추가 전용(append-only) 로그 형태로 이벤트를 저장합니다
- 액터의 전체 상태 변경 이력을 보존합니다.
- 액터 복구 시 저장된 이벤트들을 재생하여 상태를 복원합니다
Snapshot
- 액터의 전체 상태를 특정 시점에 저장합니다
- 복구 시간을 최적화하기 위한 용도로 사용됩니다
- 전체 이벤트 이력을 재생하지 않고도 빠르게 상태를 복원할 수 있게 해줍니다
- Journal과 함께 사용되며, 가장 최근 스냅샷 이후의 이벤트만 재생하면 됩니다
Durable State
- 액터의 최신 상태만을 저장합니다4. 이벤트 이력을 저장하지 않고 현재 상태만 유지합니다.
- CRUD 기반 애플리케이션과 유사한 방식으로 동작합니다
- 상태 변경 시마다 전체 상태를 덮어씁니다.
주요 차이점:
- 저장 방식: Journal은 이벤트 로그, Snapshot은 전체 상태의 특정 시점 복사본, Durable State는 최신 상태만 저장합니다.
- 복구 프로세스: Journal은 모든 이벤트 재생, Snapshot은 최근 스냅샷 + 이후 이벤트 재생, Durable State는 최신 상태만 로드합니다.
- 데이터 보존: Journal은 전체 이력 보존, Snapshot과 Durable State는 특정 시점/최신 상태만 보존합니다.
- 사용 사례: Journal은 감사와 시간 기반 쿼리에 유용, Snapshot은 복구 최적화, Durable State는 단순한 상태 관리에 적합합니다.
여기서는 Durable State 영속장치를 알아보고 나머지 장치도 알아보겠습니다.
준비하기
의존성
// Actor Persistence implementation("com.typesafe.akka:akka-persistence-typed_$scalaVersion:$akkaVersion") implementation("com.typesafe.akka:akka-persistence-query_$scalaVersion:$akkaVersion") implementation("com.lightbend.akka:akka-persistence-r2dbc_$scalaVersion:$akkaR2DBC")
DDL
CREATE TABLE IF NOT EXISTS durable_state ( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, persistence_id VARCHAR(255) NOT NULL, revision BIGINT NOT NULL, db_timestamp timestamp with time zone NOT NULL, state_ser_id INTEGER NOT NULL, state_ser_manifest VARCHAR(255), state_payload BYTEA NOT NULL, tags TEXT ARRAY, PRIMARY KEY(persistence_id, revision) ); -- `durable_state_slice_idx` is only needed if the slice based queries are used CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision);
- https://doc.akka.io/libraries/akka-persistence-r2dbc/current/index.html
- 액터모델에 탑재되는 저장 Pulugin은 JDBC / NOSQL 등 다양하게 지원하며 여기서는 r2dbc가 이용되었습니다.
- 지원 되는 플러그인에서 호환 되는 DB의 DDL을 관리하고 제공합니다.
- https://pekko.apache.org/docs/pekko-persistence-r2dbc/current/getting-started.html - Open진영인경우 pekko 참고
- 상태의 속성은 Value 객체로, state_payload 에 json형태로 저장됩니다. 저장공간을 줄이기위해 압축 바이너리로 저장도 할수 있으며 직렬화/압축 장치는 커스텀하거나 선택할수 있습니다.
- 마지막 상태값을 주로 추적하며 revision을 통해 변경횟수를 추적할수 있으며 분산저장을 하는경우 slice를 활용할수 있습니다.
CONFIG
akka { loglevel = "INFO" loggers = ["akka.event.slf4j.Slf4jLogger"] logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" actor { serializers { jackson-json = "akka.serialization.jackson.JacksonJsonSerializer" jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer" proto = "akka.remote.serialization.ProtobufSerializer" } serialization-bindings { "com.example.kotlinbootlabs.actor.PersitenceSerializable" = jackson-json } default-dispatcher { fork-join-executor { parallelism-min = 5 parallelism-factor = 2.0 parallelism-max = 10 } } } extensions = [akka.persistence.Persistence] persistence { journal { plugin = "inmemory-journal" } snapshot-store { plugin = "inmemory-snapshot-store" } state { plugin = "akka.persistence.r2dbc.state" } r2dbc { # Durable state store state { class = "akka.persistence.r2dbc.state.R2dbcDurableStateStoreProvider" table = "durable_state" assert-single-writer = on } dialect = "postgres" connection-factory { driver = "postgres" host = "localhost" database = "postgres" user = "postgres" password = "postgres" max-pool-size = 20 initial-size = 10 max-idle-time = 30000 # ssl { # enabled = on # mode = "VERIFY_CA" # root-cert = "/path/db_root.crt" # } } } } }
액터로 구현된 모델의 작동은 설정화로 도메인과 상관없는 인프라및 성능부분이 분리되어 정의되어 기본 코드는
도메인만 표현할수 있는 장점이 있긴하지만 이러한 특성으로 akka.conf 설정과 싸움을 해야할수도 있습니다.
- 직렬화 방식
- 스레드풀 지정
- 각종 커낵션 구성
- 클러스터 구성된경우 롤성정
구현 액터모델
행복한 상태일때만 인사에 응답하고 카운팅을 하는 HeloStateActor 에 영속성만 부여한 버전입니다.
enum class State { HAPPY, ANGRY } sealed class HelloPersistentStateActorCommand :PersitenceSerializable data class HelloPersistentDurable(val message: String, val replyTo: ActorRef<Any>) : HelloPersistentStateActorCommand() data class GetHelloCountPersistentDurable(val replyTo: ActorRef<Any>) : HelloPersistentStateActorCommand() data class GetHelloTotalCountPersitentDurable(val replyTo: ActorRef<Any>) : HelloPersistentStateActorCommand() //val newState: State data class ChangeState @JsonCreator constructor( @JsonProperty("newState") val newState: State, ) : HelloPersistentStateActorCommand() object ResetHelloCount : HelloPersistentStateActorCommand() sealed class HelloPersistentStateActorResponse data class HelloResponse(val message: String) : HelloPersistentStateActorResponse() data class HelloCountResponse(val count: Number) : HelloPersistentStateActorResponse() class HelloPersistentDurableStateActor private constructor( private val context: ActorContext<HelloPersistentStateActorCommand>, private val persistenceId: PersistenceId ) : DurableStateBehavior<HelloPersistentStateActorCommand, HelloState>(persistenceId) { companion object { fun create(persistenceId: PersistenceId): Behavior<HelloPersistentStateActorCommand> { return Behaviors.setup { context -> HelloPersistentDurableStateActor(context, persistenceId) } } } override fun tag(): String { return "tag1" } override fun emptyState(): HelloState = HelloState(State.HAPPY, 0, 0) override fun commandHandler(): CommandHandler<HelloPersistentStateActorCommand, HelloState> { return newCommandHandlerBuilder() .forAnyState() .onCommand(HelloPersistentDurable::class.java) { state, command -> onHello(state, command) } .onCommand(GetHelloCountPersistentDurable::class.java) { state, command -> onGetHelloCount(state, command) } .onCommand(GetHelloTotalCountPersitentDurable::class.java) { state, command -> onGetHelloTotalCount(state, command) } .onCommand(ChangeState::class.java) { state, command -> onChangeState(state, command) } .onCommand(ResetHelloCount::class.java) { state, _ -> onResetHelloCount(state) } .build() } private fun onHello(state: HelloState, command: HelloPersistentDurable): Effect<HelloState> { return when (state.state) { State.HAPPY -> { if (command.message == "Hello") { context.log.info("onHello-Kotlin") val newState = state.copy( helloCount = state.helloCount+1, helloTotalCount = state.helloTotalCount + 1, ) Effect().persist(newState).thenRun { command.replyTo.tell(HelloResponse("Kotlin")) } } else { Effect().none() } } State.ANGRY -> { command.replyTo.tell(HelloResponse("Don't talk to me!")) Effect().none() } } } private fun onGetHelloCount(state: HelloState, command: GetHelloCountPersistentDurable): Effect<HelloState> { command.replyTo.tell(HelloCountResponse(state.helloCount)) context.log.info("onGetHelloCount-helloCount: ${state.helloCount}") return Effect().none() } private fun onGetHelloTotalCount(state: HelloState, command: GetHelloTotalCountPersitentDurable): Effect<HelloState> { command.replyTo.tell(HelloCountResponse(state.helloTotalCount)) return Effect().none() } private fun onChangeState(state: HelloState, command: ChangeState): Effect<HelloState> { val newState = state.copy(state = command.newState) return Effect().persist(newState) } private fun onResetHelloCount(state: HelloState): Effect<HelloState> { context.log.info("Resetting hello count") val newState = state.copy(helloCount = 0) return Effect().persist(newState) } }
기본 액터에 추가된 개념
- 생성자 : AbstractBehavior(기본액터) - > DurableStateBehavior 로 상속을 변경하면 되며 두번째 인자가 상태를 저장할 Type과 생성시 고유 식별자인 PersitenceID를 부여받습니다.
- 초기상태 : emptyState - 최초 수행시 수행되는 초기값 설정입니다.
- 상태변경 : Effect().persist(newState) 를 통해 새로운 상태로 지정합니다.
- 이펙트는 부수효과로~ 명령에 따라 부수효과를 줄수 있습니다. 이 사이드이펙트의 효과가 우리의 기대결과와 다르게 작동하면 디펙이 됩니다.
TEST CODE
val probe: TestProbe<Any> = testKit.createTestProbe() val persistenceId = PersistenceId.ofUniqueId("HelloPersistentStateActor2") val helloPersistentDurableStateActor = testKit.spawn(HelloPersistentDurableStateActor.create(persistenceId)) // Send Hello messages helloPersistentDurableStateActor.tell(HelloPersistentDurable("Hello", probe.ref())) helloPersistentDurableStateActor.tell(HelloPersistentDurable("Hello", probe.ref())) probe.expectMessage(HelloResponse("Kotlin")) probe.expectMessage(HelloResponse("Kotlin")) // Verify the hello count helloPersistentDurableStateActor.tell(GetHelloCountPersistentDurable(probe.ref())) probe.expectMessage(HelloCountResponse(2))
- 이제 이 액터는 영속성기능이 부여되었기때문에 , 중단되어 다시 가동되더라도 마지막 상태를 보유한체로 작동이됩니다.
이제 액터 모델의 상태변경에 따라 저장장치를 채택해 동기화되는 유용한 기능을 탑재하게 되었습니다.
이러한 장치와 모델을 AKKA액터에서 제공하기때문에 CQRS가 더이상 이론이 아닌 유닛테스트를 해보고 우리의 장치에 실행해볼수 있습니다.
CRUD의 대표적인 연습 샘플이 게시판 만들기라고 하면CQRS의 대표적인 연습 샘플은 장바구니 만들기로 대용량의 트래픽을 처리할수 있도록
고안된 다음 장바구니 예제를 보는것을 추천합니다. ( 이벤트 소싱이 적용)
Next : 액터를 클러스터로 배치하기