Akka 라이선스 정책:

Pekko 라이선스 정책:

정리:


마이그레이션 

var akka_version = "2.6.0"
val scala_version = "2.13"
val pekko_version = "1.1.2"
val pekko_r2dbc_version = "1.0.0"
val pekko_jdbc_version = "1.1.0"

dependencies {
	// Typed Actor
    implementation("org.apache.pekko:pekko-actor-typed_$scala_version:$pekko_version")
    implementation("org.apache.pekko:pekko-serialization-jackson_$scala_version:$pekko_version")

    // Actor Persistence
    implementation("org.apache.pekko:pekko-persistence-typed_$scala_version:$pekko_version")
    implementation("org.apache.pekko:pekko-persistence-query_$scala_version:$pekko_version")
    implementation("org.apache.pekko:pekko-persistence-jdbc_$scala_version:$pekko_jdbc_version")
    implementation("org.apache.pekko:pekko-persistence-r2dbc_$scala_version:$pekko_r2dbc_version")
}


액터모델 자체는 AKKA진영에서 오랫동안 디벨롭했기때문에 오픈버전인 쿨다운버전을 사용해도 성숙도가 쌓였기때문에 큰 문제가 없지만

여기서 고려할내용은 액터모델을 영속화 하는 장치로 ReactiveStream기반의 R2DBC지원 버전입니다.


참고링크 : https://pekko.apache.org/docs/pekko-persistence-r2dbc/current/license-report.html

runtimeOnly("org.postgresql:r2dbc-postgresql:0.9.3.RELEASE")


하지만 이장에서는 플러그인 의존없는 레파지토리 패턴을 이용해 영속성을 부여해보겠습니다.


JDBC의 특징:

  1. 블로킹(Blocking) I/O 기반:

  2. 스레드풀 기반 확장:

  3. 전통적이고 광범위한 지원:

R2DBC의 특징 (Reactive Streams 중심):

  1. 논블로킹(Non-Blocking) I/O 기반:

  2. Reactive Streams와 백프레셔(Backpressure) 지원:

  3. 적은 스레드로도 높은 확장성:

정리:



쿨다운버전에서 R2DBC지원버전이 올라가기를 기다리거나, 이미 오랫동안 안전성을 유지한 JDBC버전을 사용하면 되지만

객체를 직접 데이터에 저장할때 용이한 방식이 ORM(JPA)라고 하면 액터모델에 발생하는 이벤트를 CQRS기법으로 저장을 지원하는것이  Perstence 장치이지만

이 장치를 포기하고 Mybatis를 이용하는것과같이 높은수준의 추상화를 포기하고 Repository 패턴을 구현해 의존성을 없애버리는 전략을 채택하였습니다.



Repository 패턴

DDL

CREATE TABLE IF NOT EXISTS durable_state (
                                             slice INT NOT NULL,
                                             entity_type VARCHAR(255) NOT NULL,
                                             persistence_id VARCHAR(255) NOT NULL,
                                             revision BIGINT NOT NULL,
                                             db_timestamp TIMESTAMP(6) NOT NULL,

                                             state_ser_id INTEGER NOT NULL,
                                             state_ser_manifest VARCHAR(255),
                                             state_payload BLOB NOT NULL,
                                             tags TEXT, -- FIXME no array type, is this the best option?

                                             PRIMARY KEY(persistence_id, revision)
);

-- `durable_state_slice_idx` is only needed if the slice based queries are used
CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision);


R2DBC를 이용하기위해 Webplux를 이용하는 비동기 처리방식으로 구현되었습니다.

DAO/DTO

interface DurableStateDAO {

    // Pure Version
    fun findAll(): Flux<DurableState>
    fun findById(persistenceId: String, revision: Long): Mono<DurableState>
    fun create(durableState: DurableState): Mono<DurableState>
    fun update(durableState: DurableState): Mono<DurableState>
    fun createOrUpdate(durableState: DurableState): Mono<DurableState>
    fun deleteById(persistenceId: String, revision: Long): Mono<Void>

    // Template Version for Actor
    fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T>
    fun <T: Any> createOrUpdateEx(persistenceId: String, revision: Long, entity: T): Mono<T>
}

data class DurableState(
    val slice: Int,
    val entityType: String,
    val persistenceId: String,
    val revision: Long,
    val dbTimestamp: LocalDateTime,
    val stateSerId: Int,
    val stateSerManifest: String,
    val statePayload: ByteArray,
    val tags: String
){...}


Repotisoty 구현체

@Repository
class DurableStateDAOImpl(private val connectionFactory: ConnectionFactory) : DurableStateDAO {

    private val client = DatabaseClient.create(connectionFactory)

    private val objectMapper = jacksonObjectMapper()

    override fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T> {
        return client.sql("SELECT * FROM durable_state WHERE persistence_id = :persistenceId AND revision = :revision")
            .bind("persistenceId", persistenceId)
            .bind("revision", revision)
            .map { row, _ ->
                val entityType = row.get("entity_type", String::class.java)!!
                val statePayload = row.get("state_payload", ByteArray::class.java)!!
                objectMapper.readValue(statePayload, Class.forName(entityType)) as T
            }
            .one()
            .switchIfEmpty(Mono.justOrEmpty(null))
    }

    override fun <T : Any> createOrUpdateEx(persistenceId: String, revision: Long, entity: T): Mono<T> {
        val entityType = entity::class.java.name
        val statePayload = objectMapper.writeValueAsBytes(entity)
        val durableState = DurableState(
            slice = 0, // Set appropriate values
            entityType = entityType,
            persistenceId = persistenceId,
            revision = revision,
            dbTimestamp = java.time.LocalDateTime.now(),
            stateSerId = 0, // Set appropriate values
            stateSerManifest = "",
            statePayload = statePayload,
            tags = ""
        )
        return createOrUpdate(durableState).thenReturn(entity)
    }

    override fun findAll(): Flux<DurableState> {
        return client.sql("SELECT * FROM durable_state")
            .map { row, _ ->
                DurableState(
                    row.get("slice", Int::class.java)!!,
                    row.get("entity_type", String::class.java)!!,
                    row.get("persistence_id", String::class.java)!!,
                    row.get("revision", Long::class.java)!!,
                    row.get("db_timestamp", java.time.LocalDateTime::class.java)!!,
                    row.get("state_ser_id", Int::class.java)!!,
                    row.get("state_ser_manifest", String::class.java )!!,
                    row.get("state_payload", ByteArray::class.java)!!,
                    row.get("tags", String::class.java)!!
                )
            }
            .all()
    }

    override fun findById(persistenceId: String, revision: Long): Mono<DurableState> {
        return client.sql("SELECT * FROM durable_state WHERE persistence_id = :persistenceId AND revision = :revision")
            .bind("persistenceId", persistenceId)
            .bind("revision", revision)
            .map { row, _ ->
                DurableState(
                    row.get("slice", Int::class.java)!!,
                    row.get("entity_type", String::class.java)!!,
                    row.get("persistence_id", String::class.java)!!,
                    row.get("revision", Long::class.java)!!,
                    row.get("db_timestamp", java.time.LocalDateTime::class.java)!!,
                    row.get("state_ser_id", Int::class.java)!!,
                    row.get("state_ser_manifest", String::class.java)!!,
                    row.get("state_payload", ByteArray::class.java)!!,
                    row.get("tags", String::class.java)!!
                )
            }
            .one()
            .switchIfEmpty(Mono.justOrEmpty(null))
    }

    override fun create(durableState: DurableState): Mono<DurableState> {
        return client.sql("INSERT INTO durable_state (slice, entity_type, persistence_id, revision, db_timestamp, state_ser_id, state_ser_manifest, state_payload, tags) VALUES (:slice, :entityType, :persistenceId, :revision, :dbTimestamp, :stateSerId, :stateSerManifest, :statePayload, :tags)")
            .bind("slice", durableState.slice)
            .bind("entityType", durableState.entityType)
            .bind("persistenceId", durableState.persistenceId)
            .bind("revision", durableState.revision)
            .bind("dbTimestamp", durableState.dbTimestamp)
            .bind("stateSerId", durableState.stateSerId)
            .bind("stateSerManifest", durableState.stateSerManifest)
            .bind("statePayload", durableState.statePayload)
            .bind("tags", durableState.tags)
            .then()
            .thenReturn(durableState)
    }

    override fun update(durableState: DurableState): Mono<DurableState> {
        return client.sql("UPDATE durable_state SET slice = :slice, entity_type = :entityType, db_timestamp = :dbTimestamp, state_ser_id = :stateSerId, state_ser_manifest = :stateSerManifest, state_payload = :statePayload, tags = :tags WHERE persistence_id = :persistenceId AND revision = :revision")
            .bind("slice", durableState.slice)
            .bind("entityType", durableState.entityType)
            .bind("dbTimestamp", durableState.dbTimestamp)
            .bind("stateSerId", durableState.stateSerId)
            .bind("stateSerManifest", durableState.stateSerManifest)
            .bind("statePayload", durableState.statePayload)
            .bind("tags", durableState.tags)
            .bind("persistenceId", durableState.persistenceId)
            .bind("revision", durableState.revision)
            .then()
            .thenReturn(durableState)
    }

    override fun createOrUpdate(durableState: DurableState): Mono<DurableState> {
        return findById(durableState.persistenceId, durableState.revision)
            .flatMap { _ ->
                update(durableState).thenReturn(durableState)
            }
            .switchIfEmpty(create(durableState).thenReturn(durableState))
    }

    override fun deleteById(persistenceId: String, revision: Long): Mono<Void> {
        return client.sql("DELETE FROM durable_state WHERE persistence_id = :persistenceId AND revision = :revision")
            .bind("persistenceId", persistenceId)
            .bind("revision", revision)
            .then()
    }
}

@Repository
class DurableRepository(private val durableStateDAO: DurableStateDAO) {

    fun getAllDurableStates(): Flux<DurableState> {
        return durableStateDAO.findAll()
    }

    fun getDurableStateById(persistenceId: String, revision: Long): Mono<DurableState> {
        return durableStateDAO.findById(persistenceId, revision)
    }

    fun createDurableState(durableState: DurableState): Mono<DurableState> {
        return durableStateDAO.create(durableState)
    }

    fun updateDurableState(durableState: DurableState): Mono<DurableState> {
        return durableStateDAO.update(durableState)
    }

    fun createOrUpdateDurableState(durableState: DurableState) : Mono<DurableState>{
        return durableStateDAO.createOrUpdate(durableState)
    }

    fun deleteDurableStateById(persistenceId: String, revision: Long): Mono<Void> {
        return durableStateDAO.deleteById(persistenceId, revision)
    }

    fun<T: Any> createOrUpdateDurableStateEx(persistenceId: String, revision: Long, entity: T) : Mono<T> {
       return durableStateDAO.createOrUpdateEx(persistenceId, revision, entity)
    }

    fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T>{
        return durableStateDAO.findByIdEx(persistenceId, revision)
    }
}


R2DBC를 레파지토리화 하느라 코드가 장황했지만 최초 복원용을 위한 Read ,  마지막 상태값 저장(persist)를 위해

CreateOrUpdata 저장 함수만 이용하며 Value객체의 자동 직렬화/역직렬화를 위해 자바객체를 모두 받을수 있는 템플릿 형태로 구현되었습니다.

커스텀한 영속성이 적용한 액터모델 구현

sealed class HelloStateStoreActorCommand
data class HelloStore(val message: String, val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand()
data class GetHelloCountStore(val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand()
data class GetHelloTotalCountStore(val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand()
data class ChangeStateStore(val newHappy: HappyState) : HelloStateStoreActorCommand()
data class HelloLimitStore(val message: String, val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand()
object ResetHelloCountStore : HelloStateStoreActorCommand()
object StopResetTimer : HelloStateStoreActorCommand()

/** HelloStateActor 반환할 수 있는 응답들 */
sealed class HelloStateStoreActorResponse
data class HelloResponseStore(val message: String) : HelloStateStoreActorResponse()
data class HelloCountResponseStore(val count: Int) : HelloStateStoreActorResponse()

/** 상태 정의 */
enum class HappyState {
    HAPPY, ANGRY
}

data class HelloStoreState (
    var happyState: HappyState,
    var helloCount: Int,
    var helloTotalCount: Int
)

/** HelloStateActor 클래스 */
class HelloStateStoreActor private constructor(
    private val context: ActorContext<HelloStateStoreActorCommand>,
    private val timers: TimerScheduler<HelloStateStoreActorCommand>,
    private val persistenceId : String,
    private val durableRepository: DurableRepository,
) : AbstractBehavior<HelloStateStoreActorCommand>(context) {

    companion object {
        fun create(persistenceId: String, durableRepository: DurableRepository): Behavior<HelloStateStoreActorCommand> {
            return Behaviors.withTimers { timers ->
                Behaviors.setup { context -> HelloStateStoreActor(context, timers, persistenceId, durableRepository) }
            }
        }
    }

    private lateinit var helloStoreState: HelloStoreState

    init {

        initStoreState()

        //timers.startTimerAtFixedRate(ResetHelloCountStore, Duration.ofSeconds(60))

    }

    override fun createReceive(): Receive<HelloStateStoreActorCommand> {
        return newReceiveBuilder()
            .onMessage(HelloStore::class.java, this::onHello)
            .onMessage(HelloLimitStore::class.java, this::onHelloLimit)
            .onMessage(GetHelloCountStore::class.java, this::onGetHelloCount)
            .onMessage(GetHelloTotalCountStore::class.java, this::onGetHelloTotalCount)
            .onMessage(ChangeStateStore::class.java, this::onChangeState)
            .onMessage(ResetHelloCountStore::class.java, this::onResetHelloCount)
            .onMessage(StopResetTimer::class.java) {
                timers.cancel(ResetHelloCountStore)
                Behaviors.same()
            }
            .build()
    }

    private val materializer = Materializer.createMaterializer(context.system)

    private val helloLimitSource = Source.queue<HelloLimitStore>(100, OverflowStrategy.backpressure())
        .throttle(3, Duration.ofSeconds(1))
        .to(Sink.foreach { cmd ->
            context.self.tell(HelloStore(cmd.message, cmd.replyTo))
        })
        .run(materializer)

    private fun onHello(command: HelloStore): Behavior<HelloStateStoreActorCommand> {
        when (helloStoreState.happyState) {
            HappyState.HAPPY -> {
                if (command.message == "Hello") {
                    helloStoreState.helloCount++
                    helloStoreState.helloTotalCount++

                    persist(helloStoreState)

                    command.replyTo.tell(HelloResponseStore("Kotlin"))
                    context.log.info("onHello-Kotlin")
                }
            }
            HappyState.ANGRY -> {
                command.replyTo.tell(HelloResponseStore("Don't talk to me!"))
            }
        }
        return this
    }

    private fun onHelloLimit(command: HelloLimitStore): Behavior<HelloStateStoreActorCommand> {
        helloLimitSource.offer(command).thenAccept { result ->
            when (result) {
                is QueueOfferResult.`Enqueued$` -> context.log.info("Command enqueued successfully")
                is QueueOfferResult.`Dropped$` -> context.log.error("Command dropped")
                is QueueOfferResult.Failure -> context.log.error("Failed to enqueue command", result.cause())
                is QueueOfferResult.`QueueClosed$` -> context.log.error("Queue was closed")
            }
        }
        return this
    }

    private fun onGetHelloCount(command: GetHelloCountStore): Behavior<HelloStateStoreActorCommand> {
        command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloCount))
        context.log.info("onGetHelloCount-helloCount: ${helloStoreState.helloCount}")
        return this
    }

    private fun onGetHelloTotalCount(command: GetHelloTotalCountStore): Behavior<HelloStateStoreActorCommand> {
        command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloTotalCount))
        return this
    }

    private fun onChangeState(command: ChangeStateStore): Behavior<HelloStateStoreActorCommand> {
        helloStoreState = helloStoreState.copy(
            happyState = command.newHappy
        )

        persist(helloStoreState)
        return this
    }

    private fun onResetHelloCount(command: ResetHelloCountStore): Behavior<HelloStateStoreActorCommand> {
        context.log.info("Resetting hello count")
        helloStoreState.helloCount = 0

        persist(helloStoreState)
        return this
    }

    private fun persist(newState: HelloStoreState) {
        runBlocking {
            durableRepository.createOrUpdateDurableStateEx(persistenceId, 1L, newState).awaitSingle()
        }
    }

    private fun initStoreState() {
        runBlocking {
            var result = durableRepository.findByIdEx<HelloStoreState>(persistenceId, 1L).awaitFirstOrNull()
            if(result != null) {
                helloStoreState = result
            }
            else{
                helloStoreState = HelloStoreState(HappyState.HAPPY, 0, 0)
            }
        }
    }
}


유닛테스트 코드

@ExtendWith(SpringExtension::class)
@DataR2dbcTest
@ActiveProfiles("test")
@ComponentScan(basePackages = ["org.example.kotlinbootreactivelabs.repositories.durable"])
class HelloStateStoreActorTest {

    @Autowired
    lateinit var durableRepository: DurableRepository

    companion object {
        private lateinit var testKit: ActorTestKit

        @BeforeAll
        @JvmStatic
        fun setup() {
            testKit = ActorTestKit.create()
        }

        @AfterAll
        @JvmStatic
        fun teardown() {
            testKit.shutdownTestKit()
        }
    }

    @Test
    fun testHelloStore() {
        val probe: TestProbe<Any> = testKit.createTestProbe()
        val helloStateStoreActor = testKit.spawn(HelloStateStoreActor.create("test-persistence-id", durableRepository))

        helloStateStoreActor.tell(ChangeStateStore(HappyState.HAPPY))
        helloStateStoreActor.tell(HelloStore("Hello", probe.ref()))
        probe.expectMessage(HelloResponseStore("Kotlin"))

    }

    @Test
    fun testChangeState() {
        val probe: TestProbe<Any> = testKit.createTestProbe()
        val helloStateStoreActor = testKit.spawn(HelloStateStoreActor.create("test-persistence-id", durableRepository))

        helloStateStoreActor.tell(ChangeStateStore(HappyState.ANGRY))
        helloStateStoreActor.tell(HelloStore("Hello", probe.ref()))
        probe.expectMessage(HelloResponseStore("Don't talk to me!"))
    }

    @Test
    fun testResetHelloCount()  {
        val probe: TestProbe<Any> = testKit.createTestProbe()
        val helloStateStoreActor = testKit.spawn(HelloStateStoreActor.create("test-persistence-id", durableRepository))

        helloStateStoreActor.tell(ChangeStateStore(HappyState.HAPPY))
        helloStateStoreActor.tell(ResetHelloCountStore)

        helloStateStoreActor.tell(GetHelloCountStore(probe.ref()))
        probe.expectMessage(HelloCountResponseStore(0))

        helloStateStoreActor.tell(HelloStore("Hello", probe.ref()))
        helloStateStoreActor.tell(HelloStore("Hello", probe.ref()))

        probe.expectMessage(HelloResponseStore("Kotlin"))
        probe.expectMessage(HelloResponseStore("Kotlin"))

        helloStateStoreActor.tell(GetHelloCountStore(probe.ref()))
        probe.expectMessage(HelloCountResponseStore(2))

    }
}


이상 Pekko를 통해 알아본 액터모델 영속성장치 커스텀하게 사용하기였습니다.