Akka 라이선스 정책:
Pekko 라이선스 정책:
정리:
var akka_version = "2.6.0" val scala_version = "2.13" val pekko_version = "1.1.2" val pekko_r2dbc_version = "1.0.0" val pekko_jdbc_version = "1.1.0" dependencies { // Typed Actor implementation("org.apache.pekko:pekko-actor-typed_$scala_version:$pekko_version") implementation("org.apache.pekko:pekko-serialization-jackson_$scala_version:$pekko_version") // Actor Persistence implementation("org.apache.pekko:pekko-persistence-typed_$scala_version:$pekko_version") implementation("org.apache.pekko:pekko-persistence-query_$scala_version:$pekko_version") implementation("org.apache.pekko:pekko-persistence-jdbc_$scala_version:$pekko_jdbc_version") implementation("org.apache.pekko:pekko-persistence-r2dbc_$scala_version:$pekko_r2dbc_version") } |
액터모델 자체는 AKKA진영에서 오랫동안 디벨롭했기때문에 오픈버전인 쿨다운버전을 사용해도 성숙도가 쌓였기때문에 큰 문제가 없지만
여기서 고려할내용은 액터모델을 영속화 하는 장치로 ReactiveStream기반의 R2DBC지원 버전입니다.
참고링크 : https://pekko.apache.org/docs/pekko-persistence-r2dbc/current/license-report.html
runtimeOnly("org.postgresql:r2dbc-postgresql:0.9.3.RELEASE") |
하지만 이장에서는 플러그인 의존없는 레파지토리 패턴을 이용해 영속성을 부여해보겠습니다.
JDBC의 특징:
블로킹(Blocking) I/O 기반:
스레드풀 기반 확장:
전통적이고 광범위한 지원:
R2DBC의 특징 (Reactive Streams 중심):
논블로킹(Non-Blocking) I/O 기반:
Publisher
형태로 반환하기 때문에, 호출 즉시 스레드를 점유하지 않고 결과가 준비될 때 비동기적으로 데이터를 흘려보냅니다.Reactive Streams와 백프레셔(Backpressure) 지원:
적은 스레드로도 높은 확장성:
정리:
쿨다운버전에서 R2DBC지원버전이 올라가기를 기다리거나, 이미 오랫동안 안전성을 유지한 JDBC버전을 사용하면 되지만
객체를 직접 데이터에 저장할때 용이한 방식이 ORM(JPA)라고 하면 액터모델에 발생하는 이벤트를 CQRS기법으로 저장을 지원하는것이 Perstence 장치이지만
이 장치를 포기하고 Mybatis를 이용하는것과같이 높은수준의 추상화를 포기하고 Repository 패턴을 구현해 의존성을 없애버리는 전략을 채택하였습니다.
DDL
CREATE TABLE IF NOT EXISTS durable_state ( slice INT NOT NULL, entity_type VARCHAR(255) NOT NULL, persistence_id VARCHAR(255) NOT NULL, revision BIGINT NOT NULL, db_timestamp TIMESTAMP(6) NOT NULL, state_ser_id INTEGER NOT NULL, state_ser_manifest VARCHAR(255), state_payload BLOB NOT NULL, tags TEXT, -- FIXME no array type, is this the best option? PRIMARY KEY(persistence_id, revision) ); -- `durable_state_slice_idx` is only needed if the slice based queries are used CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision); |
R2DBC를 이용하기위해 Webplux를 이용하는 비동기 처리방식으로 구현되었습니다.
interface DurableStateDAO { // Pure Version fun findAll(): Flux<DurableState> fun findById(persistenceId: String, revision: Long): Mono<DurableState> fun create(durableState: DurableState): Mono<DurableState> fun update(durableState: DurableState): Mono<DurableState> fun createOrUpdate(durableState: DurableState): Mono<DurableState> fun deleteById(persistenceId: String, revision: Long): Mono<Void> // Template Version for Actor fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T> fun <T: Any> createOrUpdateEx(persistenceId: String, revision: Long, entity: T): Mono<T> } data class DurableState( val slice: Int, val entityType: String, val persistenceId: String, val revision: Long, val dbTimestamp: LocalDateTime, val stateSerId: Int, val stateSerManifest: String, val statePayload: ByteArray, val tags: String ){...} |
@Repository class DurableStateDAOImpl(private val connectionFactory: ConnectionFactory) : DurableStateDAO { private val client = DatabaseClient.create(connectionFactory) private val objectMapper = jacksonObjectMapper() override fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T> { return client.sql("SELECT * FROM durable_state WHERE persistence_id = :persistenceId AND revision = :revision") .bind("persistenceId", persistenceId) .bind("revision", revision) .map { row, _ -> val entityType = row.get("entity_type", String::class.java)!! val statePayload = row.get("state_payload", ByteArray::class.java)!! objectMapper.readValue(statePayload, Class.forName(entityType)) as T } .one() .switchIfEmpty(Mono.justOrEmpty(null)) } override fun <T : Any> createOrUpdateEx(persistenceId: String, revision: Long, entity: T): Mono<T> { val entityType = entity::class.java.name val statePayload = objectMapper.writeValueAsBytes(entity) val durableState = DurableState( slice = 0, // Set appropriate values entityType = entityType, persistenceId = persistenceId, revision = revision, dbTimestamp = java.time.LocalDateTime.now(), stateSerId = 0, // Set appropriate values stateSerManifest = "", statePayload = statePayload, tags = "" ) return createOrUpdate(durableState).thenReturn(entity) } override fun findAll(): Flux<DurableState> { return client.sql("SELECT * FROM durable_state") .map { row, _ -> DurableState( row.get("slice", Int::class.java)!!, row.get("entity_type", String::class.java)!!, row.get("persistence_id", String::class.java)!!, row.get("revision", Long::class.java)!!, row.get("db_timestamp", java.time.LocalDateTime::class.java)!!, row.get("state_ser_id", Int::class.java)!!, row.get("state_ser_manifest", String::class.java )!!, row.get("state_payload", ByteArray::class.java)!!, row.get("tags", String::class.java)!! ) } .all() } override fun findById(persistenceId: String, revision: Long): Mono<DurableState> { return client.sql("SELECT * FROM durable_state WHERE persistence_id = :persistenceId AND revision = :revision") .bind("persistenceId", persistenceId) .bind("revision", revision) .map { row, _ -> DurableState( row.get("slice", Int::class.java)!!, row.get("entity_type", String::class.java)!!, row.get("persistence_id", String::class.java)!!, row.get("revision", Long::class.java)!!, row.get("db_timestamp", java.time.LocalDateTime::class.java)!!, row.get("state_ser_id", Int::class.java)!!, row.get("state_ser_manifest", String::class.java)!!, row.get("state_payload", ByteArray::class.java)!!, row.get("tags", String::class.java)!! ) } .one() .switchIfEmpty(Mono.justOrEmpty(null)) } override fun create(durableState: DurableState): Mono<DurableState> { return client.sql("INSERT INTO durable_state (slice, entity_type, persistence_id, revision, db_timestamp, state_ser_id, state_ser_manifest, state_payload, tags) VALUES (:slice, :entityType, :persistenceId, :revision, :dbTimestamp, :stateSerId, :stateSerManifest, :statePayload, :tags)") .bind("slice", durableState.slice) .bind("entityType", durableState.entityType) .bind("persistenceId", durableState.persistenceId) .bind("revision", durableState.revision) .bind("dbTimestamp", durableState.dbTimestamp) .bind("stateSerId", durableState.stateSerId) .bind("stateSerManifest", durableState.stateSerManifest) .bind("statePayload", durableState.statePayload) .bind("tags", durableState.tags) .then() .thenReturn(durableState) } override fun update(durableState: DurableState): Mono<DurableState> { return client.sql("UPDATE durable_state SET slice = :slice, entity_type = :entityType, db_timestamp = :dbTimestamp, state_ser_id = :stateSerId, state_ser_manifest = :stateSerManifest, state_payload = :statePayload, tags = :tags WHERE persistence_id = :persistenceId AND revision = :revision") .bind("slice", durableState.slice) .bind("entityType", durableState.entityType) .bind("dbTimestamp", durableState.dbTimestamp) .bind("stateSerId", durableState.stateSerId) .bind("stateSerManifest", durableState.stateSerManifest) .bind("statePayload", durableState.statePayload) .bind("tags", durableState.tags) .bind("persistenceId", durableState.persistenceId) .bind("revision", durableState.revision) .then() .thenReturn(durableState) } override fun createOrUpdate(durableState: DurableState): Mono<DurableState> { return findById(durableState.persistenceId, durableState.revision) .flatMap { _ -> update(durableState).thenReturn(durableState) } .switchIfEmpty(create(durableState).thenReturn(durableState)) } override fun deleteById(persistenceId: String, revision: Long): Mono<Void> { return client.sql("DELETE FROM durable_state WHERE persistence_id = :persistenceId AND revision = :revision") .bind("persistenceId", persistenceId) .bind("revision", revision) .then() } } @Repository class DurableRepository(private val durableStateDAO: DurableStateDAO) { fun getAllDurableStates(): Flux<DurableState> { return durableStateDAO.findAll() } fun getDurableStateById(persistenceId: String, revision: Long): Mono<DurableState> { return durableStateDAO.findById(persistenceId, revision) } fun createDurableState(durableState: DurableState): Mono<DurableState> { return durableStateDAO.create(durableState) } fun updateDurableState(durableState: DurableState): Mono<DurableState> { return durableStateDAO.update(durableState) } fun createOrUpdateDurableState(durableState: DurableState) : Mono<DurableState>{ return durableStateDAO.createOrUpdate(durableState) } fun deleteDurableStateById(persistenceId: String, revision: Long): Mono<Void> { return durableStateDAO.deleteById(persistenceId, revision) } fun<T: Any> createOrUpdateDurableStateEx(persistenceId: String, revision: Long, entity: T) : Mono<T> { return durableStateDAO.createOrUpdateEx(persistenceId, revision, entity) } fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T>{ return durableStateDAO.findByIdEx(persistenceId, revision) } } |
R2DBC를 레파지토리화 하느라 코드가 장황했지만 최초 복원용을 위한 Read , 마지막 상태값 저장(persist)를 위해
CreateOrUpdata 저장 함수만 이용하며 Value객체의 자동 직렬화/역직렬화를 위해 자바객체를 모두 받을수 있는 템플릿 형태로 구현되었습니다.
sealed class HelloStateStoreActorCommand data class HelloStore(val message: String, val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand() data class GetHelloCountStore(val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand() data class GetHelloTotalCountStore(val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand() data class ChangeStateStore(val newHappy: HappyState) : HelloStateStoreActorCommand() data class HelloLimitStore(val message: String, val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand() object ResetHelloCountStore : HelloStateStoreActorCommand() object StopResetTimer : HelloStateStoreActorCommand() /** HelloStateActor 반환할 수 있는 응답들 */ sealed class HelloStateStoreActorResponse data class HelloResponseStore(val message: String) : HelloStateStoreActorResponse() data class HelloCountResponseStore(val count: Int) : HelloStateStoreActorResponse() /** 상태 정의 */ enum class HappyState { HAPPY, ANGRY } data class HelloStoreState ( var happyState: HappyState, var helloCount: Int, var helloTotalCount: Int ) /** HelloStateActor 클래스 */ class HelloStateStoreActor private constructor( private val context: ActorContext<HelloStateStoreActorCommand>, private val timers: TimerScheduler<HelloStateStoreActorCommand>, private val persistenceId : String, private val durableRepository: DurableRepository, ) : AbstractBehavior<HelloStateStoreActorCommand>(context) { companion object { fun create(persistenceId: String, durableRepository: DurableRepository): Behavior<HelloStateStoreActorCommand> { return Behaviors.withTimers { timers -> Behaviors.setup { context -> HelloStateStoreActor(context, timers, persistenceId, durableRepository) } } } } private lateinit var helloStoreState: HelloStoreState init { initStoreState() //timers.startTimerAtFixedRate(ResetHelloCountStore, Duration.ofSeconds(60)) } override fun createReceive(): Receive<HelloStateStoreActorCommand> { return newReceiveBuilder() .onMessage(HelloStore::class.java, this::onHello) .onMessage(HelloLimitStore::class.java, this::onHelloLimit) .onMessage(GetHelloCountStore::class.java, this::onGetHelloCount) .onMessage(GetHelloTotalCountStore::class.java, this::onGetHelloTotalCount) .onMessage(ChangeStateStore::class.java, this::onChangeState) .onMessage(ResetHelloCountStore::class.java, this::onResetHelloCount) .onMessage(StopResetTimer::class.java) { timers.cancel(ResetHelloCountStore) Behaviors.same() } .build() } private val materializer = Materializer.createMaterializer(context.system) private val helloLimitSource = Source.queue<HelloLimitStore>(100, OverflowStrategy.backpressure()) .throttle(3, Duration.ofSeconds(1)) .to(Sink.foreach { cmd -> context.self.tell(HelloStore(cmd.message, cmd.replyTo)) }) .run(materializer) private fun onHello(command: HelloStore): Behavior<HelloStateStoreActorCommand> { when (helloStoreState.happyState) { HappyState.HAPPY -> { if (command.message == "Hello") { helloStoreState.helloCount++ helloStoreState.helloTotalCount++ persist(helloStoreState) command.replyTo.tell(HelloResponseStore("Kotlin")) context.log.info("onHello-Kotlin") } } HappyState.ANGRY -> { command.replyTo.tell(HelloResponseStore("Don't talk to me!")) } } return this } private fun onHelloLimit(command: HelloLimitStore): Behavior<HelloStateStoreActorCommand> { helloLimitSource.offer(command).thenAccept { result -> when (result) { is QueueOfferResult.`Enqueued$` -> context.log.info("Command enqueued successfully") is QueueOfferResult.`Dropped$` -> context.log.error("Command dropped") is QueueOfferResult.Failure -> context.log.error("Failed to enqueue command", result.cause()) is QueueOfferResult.`QueueClosed$` -> context.log.error("Queue was closed") } } return this } private fun onGetHelloCount(command: GetHelloCountStore): Behavior<HelloStateStoreActorCommand> { command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloCount)) context.log.info("onGetHelloCount-helloCount: ${helloStoreState.helloCount}") return this } private fun onGetHelloTotalCount(command: GetHelloTotalCountStore): Behavior<HelloStateStoreActorCommand> { command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloTotalCount)) return this } private fun onChangeState(command: ChangeStateStore): Behavior<HelloStateStoreActorCommand> { helloStoreState = helloStoreState.copy( happyState = command.newHappy ) persist(helloStoreState) return this } private fun onResetHelloCount(command: ResetHelloCountStore): Behavior<HelloStateStoreActorCommand> { context.log.info("Resetting hello count") helloStoreState.helloCount = 0 persist(helloStoreState) return this } private fun persist(newState: HelloStoreState) { runBlocking { durableRepository.createOrUpdateDurableStateEx(persistenceId, 1L, newState).awaitSingle() } } private fun initStoreState() { runBlocking { var result = durableRepository.findByIdEx<HelloStoreState>(persistenceId, 1L).awaitFirstOrNull() if(result != null) { helloStoreState = result } else{ helloStoreState = HelloStoreState(HappyState.HAPPY, 0, 0) } } } } |
@ExtendWith(SpringExtension::class) @DataR2dbcTest @ActiveProfiles("test") @ComponentScan(basePackages = ["org.example.kotlinbootreactivelabs.repositories.durable"]) class HelloStateStoreActorTest { @Autowired lateinit var durableRepository: DurableRepository companion object { private lateinit var testKit: ActorTestKit @BeforeAll @JvmStatic fun setup() { testKit = ActorTestKit.create() } @AfterAll @JvmStatic fun teardown() { testKit.shutdownTestKit() } } @Test fun testHelloStore() { val probe: TestProbe<Any> = testKit.createTestProbe() val helloStateStoreActor = testKit.spawn(HelloStateStoreActor.create("test-persistence-id", durableRepository)) helloStateStoreActor.tell(ChangeStateStore(HappyState.HAPPY)) helloStateStoreActor.tell(HelloStore("Hello", probe.ref())) probe.expectMessage(HelloResponseStore("Kotlin")) } @Test fun testChangeState() { val probe: TestProbe<Any> = testKit.createTestProbe() val helloStateStoreActor = testKit.spawn(HelloStateStoreActor.create("test-persistence-id", durableRepository)) helloStateStoreActor.tell(ChangeStateStore(HappyState.ANGRY)) helloStateStoreActor.tell(HelloStore("Hello", probe.ref())) probe.expectMessage(HelloResponseStore("Don't talk to me!")) } @Test fun testResetHelloCount() { val probe: TestProbe<Any> = testKit.createTestProbe() val helloStateStoreActor = testKit.spawn(HelloStateStoreActor.create("test-persistence-id", durableRepository)) helloStateStoreActor.tell(ChangeStateStore(HappyState.HAPPY)) helloStateStoreActor.tell(ResetHelloCountStore) helloStateStoreActor.tell(GetHelloCountStore(probe.ref())) probe.expectMessage(HelloCountResponseStore(0)) helloStateStoreActor.tell(HelloStore("Hello", probe.ref())) helloStateStoreActor.tell(HelloStore("Hello", probe.ref())) probe.expectMessage(HelloResponseStore("Kotlin")) probe.expectMessage(HelloResponseStore("Kotlin")) helloStateStoreActor.tell(GetHelloCountStore(probe.ref())) probe.expectMessage(HelloCountResponseStore(2)) } } |
이상 Pekko를 통해 알아본 액터모델 영속성장치 커스텀하게 사용하기였습니다.