![Akka > [Kotlin] PersistentDurableStateActor > eventsourced-actor.gif](/download/attachments/93946389/eventsourced-actor.gif?version=1&modificationDate=1729950092700&api=v2)
액터는 메시지를 실시간으로 받을수 있으며 그 메시지를 DB에 저장할수 있는 영속화(Persitent) 기능을 지원합니다.
이벤트를 먼저 설계한후 저장장치를 이후 고민할수 있는것이 액터모델의 특징이며
다음과같은 영속성장치를 스키마모델과 함께 제공합니다.
주요 차이점:
여기서는 Durable State 영속장치를 알아보고 나머지 장치도 알아보겠습니다.
// Actor Persistence
implementation("com.typesafe.akka:akka-persistence-typed_$scalaVersion:$akkaVersion")
implementation("com.typesafe.akka:akka-persistence-query_$scalaVersion:$akkaVersion")
implementation("com.lightbend.akka:akka-persistence-r2dbc_$scalaVersion:$akkaR2DBC") |
CREATE TABLE IF NOT EXISTS durable_state (
slice INT NOT NULL,
entity_type VARCHAR(255) NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
revision BIGINT NOT NULL,
db_timestamp timestamp with time zone NOT NULL,
state_ser_id INTEGER NOT NULL,
state_ser_manifest VARCHAR(255),
state_payload BYTEA NOT NULL,
tags TEXT ARRAY,
PRIMARY KEY(persistence_id, revision)
);
-- `durable_state_slice_idx` is only needed if the slice based queries are used
CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision);
|
akka {
loglevel = "INFO"
loggers = ["akka.event.slf4j.Slf4jLogger"]
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
actor {
serializers {
jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
}
serialization-bindings {
"com.example.kotlinbootlabs.actor.PersitenceSerializable" = jackson-json
}
default-dispatcher {
fork-join-executor {
parallelism-min = 5
parallelism-factor = 2.0
parallelism-max = 10
}
}
}
extensions = [akka.persistence.Persistence]
persistence {
journal {
plugin = "inmemory-journal"
}
snapshot-store {
plugin = "inmemory-snapshot-store"
}
state {
plugin = "akka.persistence.r2dbc.state"
}
r2dbc {
# Durable state store
state {
class = "akka.persistence.r2dbc.state.R2dbcDurableStateStoreProvider"
table = "durable_state"
assert-single-writer = on
}
dialect = "postgres"
connection-factory {
driver = "postgres"
host = "localhost"
database = "postgres"
user = "postgres"
password = "postgres"
max-pool-size = 20
initial-size = 10
max-idle-time = 30000
# ssl {
# enabled = on
# mode = "VERIFY_CA"
# root-cert = "/path/db_root.crt"
# }
}
}
}
} |
액터로 구현된 모델의 작동은 설정화로 도메인과 상관없는 인프라및 성능부분이 분리되어 정의되어 기본 코드는
도메인만 표현할수 있는 장점이 있긴하지만 이러한 특성으로 akka.conf 설정과 싸움을 해야할수도 있습니다.
행복한 상태일때만 인사에 응답하고 카운팅을 하는 HeloStateActor 에 영속성만 부여한 버전입니다.
enum class State {
HAPPY,
ANGRY
}
sealed class HelloPersistentStateActorCommand :PersitenceSerializable
data class HelloPersistentDurable(val message: String, val replyTo: ActorRef<Any>) : HelloPersistentStateActorCommand()
data class GetHelloCountPersistentDurable(val replyTo: ActorRef<Any>) : HelloPersistentStateActorCommand()
data class GetHelloTotalCountPersitentDurable(val replyTo: ActorRef<Any>) : HelloPersistentStateActorCommand()
//val newState: State
data class ChangeState @JsonCreator constructor(
@JsonProperty("newState")
val newState: State,
) : HelloPersistentStateActorCommand()
object ResetHelloCount : HelloPersistentStateActorCommand()
sealed class HelloPersistentStateActorResponse
data class HelloResponse(val message: String) : HelloPersistentStateActorResponse()
data class HelloCountResponse(val count: Number) : HelloPersistentStateActorResponse()
class HelloPersistentDurableStateActor private constructor(
private val context: ActorContext<HelloPersistentStateActorCommand>,
private val persistenceId: PersistenceId
) : DurableStateBehavior<HelloPersistentStateActorCommand, HelloState>(persistenceId) {
companion object {
fun create(persistenceId: PersistenceId): Behavior<HelloPersistentStateActorCommand> {
return Behaviors.setup { context -> HelloPersistentDurableStateActor(context, persistenceId) }
}
}
override fun tag(): String {
return "tag1"
}
override fun emptyState(): HelloState = HelloState(State.HAPPY, 0, 0)
override fun commandHandler(): CommandHandler<HelloPersistentStateActorCommand, HelloState> {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(HelloPersistentDurable::class.java) { state, command -> onHello(state, command) }
.onCommand(GetHelloCountPersistentDurable::class.java) { state, command -> onGetHelloCount(state, command) }
.onCommand(GetHelloTotalCountPersitentDurable::class.java) { state, command -> onGetHelloTotalCount(state, command) }
.onCommand(ChangeState::class.java) { state, command -> onChangeState(state, command) }
.onCommand(ResetHelloCount::class.java) { state, _ -> onResetHelloCount(state) }
.build()
}
private fun onHello(state: HelloState, command: HelloPersistentDurable): Effect<HelloState> {
return when (state.state) {
State.HAPPY -> {
if (command.message == "Hello") {
context.log.info("onHello-Kotlin")
val newState = state.copy(
helloCount = state.helloCount+1,
helloTotalCount = state.helloTotalCount + 1,
)
Effect().persist(newState).thenRun {
command.replyTo.tell(HelloResponse("Kotlin"))
}
} else {
Effect().none()
}
}
State.ANGRY -> {
command.replyTo.tell(HelloResponse("Don't talk to me!"))
Effect().none()
}
}
}
private fun onGetHelloCount(state: HelloState, command: GetHelloCountPersistentDurable): Effect<HelloState> {
command.replyTo.tell(HelloCountResponse(state.helloCount))
context.log.info("onGetHelloCount-helloCount: ${state.helloCount}")
return Effect().none()
}
private fun onGetHelloTotalCount(state: HelloState, command: GetHelloTotalCountPersitentDurable): Effect<HelloState> {
command.replyTo.tell(HelloCountResponse(state.helloTotalCount))
return Effect().none()
}
private fun onChangeState(state: HelloState, command: ChangeState): Effect<HelloState> {
val newState = state.copy(state = command.newState)
return Effect().persist(newState)
}
private fun onResetHelloCount(state: HelloState): Effect<HelloState> {
context.log.info("Resetting hello count")
val newState = state.copy(helloCount = 0)
return Effect().persist(newState)
}
} |
기본 액터에 추가된 개념
val probe: TestProbe<Any> = testKit.createTestProbe()
val persistenceId = PersistenceId.ofUniqueId("HelloPersistentStateActor2")
val helloPersistentDurableStateActor = testKit.spawn(HelloPersistentDurableStateActor.create(persistenceId))
// Send Hello messages
helloPersistentDurableStateActor.tell(HelloPersistentDurable("Hello", probe.ref()))
helloPersistentDurableStateActor.tell(HelloPersistentDurable("Hello", probe.ref()))
probe.expectMessage(HelloResponse("Kotlin"))
probe.expectMessage(HelloResponse("Kotlin"))
// Verify the hello count
helloPersistentDurableStateActor.tell(GetHelloCountPersistentDurable(probe.ref()))
probe.expectMessage(HelloCountResponse(2)) |
이제 액터 모델의 상태변경에 따라 저장장치를 채택해 동기화되는 유용한 기능을 탑재하게 되었습니다.
![Akka > [Kotlin] PersistentDurableStateActor > image-2024-10-26_23-10-11.png](/download/attachments/93946389/image-2024-10-26_23-10-11.png?version=1&modificationDate=1729951811449&api=v2)
이러한 장치와 모델을 AKKA액터에서 제공하기때문에 CQRS가 더이상 이론이 아닌 유닛테스트를 해보고 우리의 장치에 실행해볼수 있습니다.
CRUD의 대표적인 연습 샘플이 게시판 만들기라고 하면CQRS의 대표적인 연습 샘플은 장바구니 만들기로 대용량의 트래픽을 처리할수 있도록
고안된 다음 장바구니 예제를 보는것을 추천합니다. ( 이벤트 소싱이 적용)
Next : 액터를 클러스터로 배치하기