You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Current »

Akka 라이선스 정책:

  • 이전 상태(과거): Akka는 처음 개발될 때부터 오픈소스 프레임워크로서 Apache License 2.0 하에 배포되어 누구나 자유롭게 사용, 수정, 재배포할 수 있었습니다.
  • 변경 배경: Akka를 개발한 Lightbend는 프로젝트 유지와 지속 가능한 비즈니스 모델을 확립하기 위해 기존의 완전 오픈소스 정책에서 벗어나, 상업적 사용에 대한 새로운 수익 창출 방식을 모색하게 되었습니다.
  • Business Source License (BSL):
    • 핵심 개념: Akka는 특정 버전(2022년 9월 이후 릴리즈된 버전)부터 Business Source License를 적용하고 있습니다. BSL은 기본적으로 소스 코드를 공개하지만, 상업적 용도에 대해 일정한 제한을 두는 라이선스입니다.
    • 사용 조건:
      • 개인적·비상업적 사용은 무료이며, 해당 소스를 열람하고 수정하는 것이 가능합니다.
      • 상업적 사용(특히 일정 규모 이상의 매출 또는 사용자 수를 가지는 기업 등)에 대해서는 라이선스 비용 지불 또는 Lightbend의 상용 구독 모델 이용이 요구될 수 있습니다.
    • 시간 경과에 따른 전환: BSL은 일정한 “쿨다운(cooldown) 기간”을 두는데, 이 기간이 지나면 해당 버전의 코드는 Apache License 2.0과 같이 완전한 오픈소스로 전환됩니다. 즉, 발표된 후 일정 시간이 흐르면 결국은 자유로운 오픈소스가 되지만, 그 기간 동안 상업적 활용에는 제약이 있는 구조입니다.

Pekko 라이선스 정책:

  • 프로젝트 기원: Pekko는 Akka 커뮤니티와 Apache Software Foundation(ASF)이 Akka의 라이선스 변화 이후 오픈소스 정신을 유지하고자 하는 취지에서 출발한 프로젝트입니다.
  • Apache License 2.0:
    • Pekko는 Apache Software Foundation에 의해 관리되며 완전한 오픈소스 라이선스(아파치 라이선스 2.0)를 적용하고 있습니다.
    • 이 라이선스는 상업적·비상업적 용도를 비롯해 배포, 수정, 재사용에 대한 어떤 제한도 두지 않으며, 로열티나 라이선스 비용 없이 자유롭게 사용할 수 있습니다.
    • Apache Software Foundation의 거버넌스 모델 덕분에 프로젝트 발전 방향과 라이선스 정책 변화가 투명하고 커뮤니티 주도로 이루어집니다.

정리:

  • Akka는 비즈니스 모델 확립과 지속 가능한 개발을 위해 BSL을 적용하여 상업적 사용에 조건을 둔 반면,
  • Pekko는 Apache License 2.0을 유지하며 기존의 완전한 오픈소스 정신과 자유도를 보장합니다.


마이그레이션 

var akka_version = "2.6.0"
val scala_version = "2.13"
val pekko_version = "1.1.2"
val pekko_r2dbc_version = "1.0.0"
val pekko_jdbc_version = "1.1.0"

dependencies {
	// Typed Actor
    implementation("org.apache.pekko:pekko-actor-typed_$scala_version:$pekko_version")
    implementation("org.apache.pekko:pekko-serialization-jackson_$scala_version:$pekko_version")

    // Actor Persistence
    implementation("org.apache.pekko:pekko-persistence-typed_$scala_version:$pekko_version")
    implementation("org.apache.pekko:pekko-persistence-query_$scala_version:$pekko_version")
    implementation("org.apache.pekko:pekko-persistence-jdbc_$scala_version:$pekko_jdbc_version")
    implementation("org.apache.pekko:pekko-persistence-r2dbc_$scala_version:$pekko_r2dbc_version")
}
  • akka 에서 org.apache.pekko 로변경하면 됩니다.
  • 대응 하는 akka버전은 2.6.x 이며 , pekko 의 버전은 1.1.2 입니다.
  • r2dbc는 rective stream 버전을 따르며 1.0.0 입니다. 


액터모델 자체는 AKKA진영에서 오랫동안 디벨롭했기때문에 오픈버전인 쿨다운버전을 사용해도 성숙도가 쌓였기때문에 큰 문제가 없지만

여기서 문제는 액터모델을 영속화 하는 장치로 ReactiveStream기반의 R2DBC지원 버전입니다.


최신 버전은 1.30 까지 나온상태이지만 쿨다운 버전 호환성이 1.0.0 이며 R2DBC가 아직 성숙도가 쌓이지 않고 약간의 버그를 가지고 있는 버전이기도합니다.

이시기 자바진영에서 해당 버전을 먼저 사용했다고 하면  데이테베이스 데이터형 불일치로 변환과정중에 다음과 같은 오류가 핵심드라이버에 존재해 별도의 코드로 대응해야하는 이슈가 있습니다.

  • long 을 int 로 변환할수 없습니다. 


비교적 안정적인 JDBC를 사용해 해결할수도 있겠지만, R2DBC는 이제 성숙도에 오른것같고 다음과같이 포기하지 못할 성능점 이점이 있으며

웹영역 역시 스레드기반의 MVC가 아닌 Reactive를 채택함으로 API호출  효율화를 함께 가져갈수 있습니다


JDBC의 특징:

  1. 블로킹(Blocking) I/O 기반:

    • JDBC는 데이터베이스 쿼리를 실행할 때 호출한 스레드가 결과를 받을 때까지 대기(블로킹)합니다.
    • 동기적 호출 방식이므로, 하나의 스레드가 쿼리 결과를 가져오기 위해 잠깐이라도 대기하면 해당 스레드는 그 시간 동안 다른 일을 처리하지 못합니다.
  2. 스레드풀 기반 확장:

    • 확장성을 확보하기 위해 JDBC를 사용할 때는 일반적으로 커넥션 풀과 충분한 스레드풀을 구성해야 합니다.
    • 많은 요청이 동시에 들어오는 상황에서는 스레드 수 증가와 컨텍스트 스위칭 등의 오버헤드가 커질 수 있습니다.
  3. 전통적이고 광범위한 지원:

    • 오랜 기간 표준으로 자리 잡은 JDBC는 다양한 DB 드라이버, 프레임워크, 라이브러리에서 폭넓게 지원됩니다.
    • 성숙한 생태계와 풍부한 레퍼런스를 가지고 있습니다.

R2DBC의 특징 (Reactive Streams 중심):

  1. 논블로킹(Non-Blocking) I/O 기반:

    • R2DBC는 Reactive Streams 사양을 기반으로 하여 데이터베이스와의 상호작용을 논블로킹 비동기 방식으로 처리합니다.
    • 데이터베이스 쿼리 결과를 Publisher 형태로 반환하기 때문에, 호출 즉시 스레드를 점유하지 않고 결과가 준비될 때 비동기적으로 데이터를 흘려보냅니다.
  2. Reactive Streams와 백프레셔(Backpressure) 지원:

    • R2DBC는 Reactor, RxJava 등 Reactive Streams 기반 라이브러리들과 자연스럽게 통합됩니다.
    • 데이터 소비 측(구독자)이 처리 가능한 양에 따라 요청하는 backpressure를 지원하며, 데이터베이스로부터 적절한 속도로 데이터를 가져올 수 있습니다.
    • 이를 통해 처리량 관리가 유연해지고, 스레드 낭비나 불필요한 메모리 소비를 줄일 수 있습니다.
  3. 적은 스레드로도 높은 확장성:

    • 스레드를 블로킹하지 않으므로, 일반적으로 적은 수의 스레드로도 많은 동시성을 처리할 수 있습니다.
    • 고성능 네트워크 I/O나 대규모 애플리케이션에서 유리하며, Reactive Web 프레임워크(Spring WebFlux 등)와 결합하면 효율적인 자원 활용을 기대할 수 있습니다.

정리:

  • JDBC는 전통적이고 폭넓게 사용되며, 동기적·블로킹 호출 방식을 통해 직관적으로 다룰 수 있지만, 많은 부하가 걸리는 환경에서는 스레드와 자원 사용 최적화가 어려울 수 있습니다.
  • R2DBC는 Reactive Streams를 기반으로 비동기·논블로킹 방식으로 동작하여, 적은 스레드로도 높은 동시성과 효율적인 리소스 활용을 가능하게 하며 Reactive 프로그래밍 패러다임과 자연스럽게 어울립니다.



쿨다운버전에서 R2DBC지원버전이 올라가기를 기다리거나, 이미 오랫동안 안전성을 유지한 JDBC버전을 사용하면 되지만

객체를 직접 데이터에 저장할때 용이한 방식이 ORM(JPA)라고 하면 액터모델에 발생하는 이벤트를 CQRS기법으로 저장을 지원하는것이  Perstence 장치이지만

이 장치를 포기하고 Mybatis를 이용하는것과같이 높은수준의 추상화를 포기하고 Repository 패턴을 구현해 의존성을 없애버리는 전략을 채택하였습니다.



Repository 패턴

DDL

CREATE TABLE IF NOT EXISTS durable_state (
                                             slice INT NOT NULL,
                                             entity_type VARCHAR(255) NOT NULL,
                                             persistence_id VARCHAR(255) NOT NULL,
                                             revision BIGINT NOT NULL,
                                             db_timestamp TIMESTAMP(6) NOT NULL,

                                             state_ser_id INTEGER NOT NULL,
                                             state_ser_manifest VARCHAR(255),
                                             state_payload BLOB NOT NULL,
                                             tags TEXT, -- FIXME no array type, is this the best option?

                                             PRIMARY KEY(persistence_id, revision)
);

-- `durable_state_slice_idx` is only needed if the slice based queries are used
CREATE INDEX durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision);

  • https://github.com/apache/pekko-persistence-r2dbc/tree/main/ddl-scripts
    • CQRS를 지원하는 다양한 DDL은  Database별로 확인할수 있습니다. 
  • Durable은 이벤트 상태 변화의 마지막 상태를 저장하고 복원지원하는 스키마로 CRUD와 유사하게 작동하는 CQRS용 테이블입니다. 
  • 모든 변화 이벤트를 다루는 이벤트 소싱에서는 저널이라는 스키마를 이용할수 있습니다.


R2DBC를 이용하기위해 Webplux를 이용하는 비동기 처리방식으로 구현되었습니다.

DAO/DTO

interface DurableStateDAO {

    // Pure Version
    fun findAll(): Flux<DurableState>
    fun findById(persistenceId: String, revision: Long): Mono<DurableState>
    fun create(durableState: DurableState): Mono<DurableState>
    fun update(durableState: DurableState): Mono<DurableState>
    fun createOrUpdate(durableState: DurableState): Mono<DurableState>
    fun deleteById(persistenceId: String, revision: Long): Mono<Void>

    // Template Version for Actor
    fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T>
    fun <T: Any> createOrUpdateEx(persistenceId: String, revision: Long, entity: T): Mono<T>
}

data class DurableState(
    val slice: Int,
    val entityType: String,
    val persistenceId: String,
    val revision: Long,
    val dbTimestamp: LocalDateTime,
    val stateSerId: Int,
    val stateSerManifest: String,
    val statePayload: ByteArray,
    val tags: String
){...}


Repotisoty 구현체

@Repository
class DurableStateDAOImpl(private val connectionFactory: ConnectionFactory) : DurableStateDAO {

    private val client = DatabaseClient.create(connectionFactory)

    private val objectMapper = jacksonObjectMapper()

    override fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T> {
        return client.sql("SELECT * FROM durable_state WHERE persistence_id = :persistenceId AND revision = :revision")
            .bind("persistenceId", persistenceId)
            .bind("revision", revision)
            .map { row, _ ->
                val entityType = row.get("entity_type", String::class.java)!!
                val statePayload = row.get("state_payload", ByteArray::class.java)!!
                objectMapper.readValue(statePayload, Class.forName(entityType)) as T
            }
            .one()
            .switchIfEmpty(Mono.justOrEmpty(null))
    }

    override fun <T : Any> createOrUpdateEx(persistenceId: String, revision: Long, entity: T): Mono<T> {
        val entityType = entity::class.java.name
        val statePayload = objectMapper.writeValueAsBytes(entity)
        val durableState = DurableState(
            slice = 0, // Set appropriate values
            entityType = entityType,
            persistenceId = persistenceId,
            revision = revision,
            dbTimestamp = java.time.LocalDateTime.now(),
            stateSerId = 0, // Set appropriate values
            stateSerManifest = "",
            statePayload = statePayload,
            tags = ""
        )
        return createOrUpdate(durableState).thenReturn(entity)
    }

    override fun findAll(): Flux<DurableState> {
        return client.sql("SELECT * FROM durable_state")
            .map { row, _ ->
                DurableState(
                    row.get("slice", Int::class.java)!!,
                    row.get("entity_type", String::class.java)!!,
                    row.get("persistence_id", String::class.java)!!,
                    row.get("revision", Long::class.java)!!,
                    row.get("db_timestamp", java.time.LocalDateTime::class.java)!!,
                    row.get("state_ser_id", Int::class.java)!!,
                    row.get("state_ser_manifest", String::class.java )!!,
                    row.get("state_payload", ByteArray::class.java)!!,
                    row.get("tags", String::class.java)!!
                )
            }
            .all()
    }

    override fun findById(persistenceId: String, revision: Long): Mono<DurableState> {
        return client.sql("SELECT * FROM durable_state WHERE persistence_id = :persistenceId AND revision = :revision")
            .bind("persistenceId", persistenceId)
            .bind("revision", revision)
            .map { row, _ ->
                DurableState(
                    row.get("slice", Int::class.java)!!,
                    row.get("entity_type", String::class.java)!!,
                    row.get("persistence_id", String::class.java)!!,
                    row.get("revision", Long::class.java)!!,
                    row.get("db_timestamp", java.time.LocalDateTime::class.java)!!,
                    row.get("state_ser_id", Int::class.java)!!,
                    row.get("state_ser_manifest", String::class.java)!!,
                    row.get("state_payload", ByteArray::class.java)!!,
                    row.get("tags", String::class.java)!!
                )
            }
            .one()
            .switchIfEmpty(Mono.justOrEmpty(null))
    }

    override fun create(durableState: DurableState): Mono<DurableState> {
        return client.sql("INSERT INTO durable_state (slice, entity_type, persistence_id, revision, db_timestamp, state_ser_id, state_ser_manifest, state_payload, tags) VALUES (:slice, :entityType, :persistenceId, :revision, :dbTimestamp, :stateSerId, :stateSerManifest, :statePayload, :tags)")
            .bind("slice", durableState.slice)
            .bind("entityType", durableState.entityType)
            .bind("persistenceId", durableState.persistenceId)
            .bind("revision", durableState.revision)
            .bind("dbTimestamp", durableState.dbTimestamp)
            .bind("stateSerId", durableState.stateSerId)
            .bind("stateSerManifest", durableState.stateSerManifest)
            .bind("statePayload", durableState.statePayload)
            .bind("tags", durableState.tags)
            .then()
            .thenReturn(durableState)
    }

    override fun update(durableState: DurableState): Mono<DurableState> {
        return client.sql("UPDATE durable_state SET slice = :slice, entity_type = :entityType, db_timestamp = :dbTimestamp, state_ser_id = :stateSerId, state_ser_manifest = :stateSerManifest, state_payload = :statePayload, tags = :tags WHERE persistence_id = :persistenceId AND revision = :revision")
            .bind("slice", durableState.slice)
            .bind("entityType", durableState.entityType)
            .bind("dbTimestamp", durableState.dbTimestamp)
            .bind("stateSerId", durableState.stateSerId)
            .bind("stateSerManifest", durableState.stateSerManifest)
            .bind("statePayload", durableState.statePayload)
            .bind("tags", durableState.tags)
            .bind("persistenceId", durableState.persistenceId)
            .bind("revision", durableState.revision)
            .then()
            .thenReturn(durableState)
    }

    override fun createOrUpdate(durableState: DurableState): Mono<DurableState> {
        return findById(durableState.persistenceId, durableState.revision)
            .flatMap { _ ->
                update(durableState).thenReturn(durableState)
            }
            .switchIfEmpty(create(durableState).thenReturn(durableState))
    }

    override fun deleteById(persistenceId: String, revision: Long): Mono<Void> {
        return client.sql("DELETE FROM durable_state WHERE persistence_id = :persistenceId AND revision = :revision")
            .bind("persistenceId", persistenceId)
            .bind("revision", revision)
            .then()
    }
}

@Repository
class DurableRepository(private val durableStateDAO: DurableStateDAO) {

    fun getAllDurableStates(): Flux<DurableState> {
        return durableStateDAO.findAll()
    }

    fun getDurableStateById(persistenceId: String, revision: Long): Mono<DurableState> {
        return durableStateDAO.findById(persistenceId, revision)
    }

    fun createDurableState(durableState: DurableState): Mono<DurableState> {
        return durableStateDAO.create(durableState)
    }

    fun updateDurableState(durableState: DurableState): Mono<DurableState> {
        return durableStateDAO.update(durableState)
    }

    fun createOrUpdateDurableState(durableState: DurableState) : Mono<DurableState>{
        return durableStateDAO.createOrUpdate(durableState)
    }

    fun deleteDurableStateById(persistenceId: String, revision: Long): Mono<Void> {
        return durableStateDAO.deleteById(persistenceId, revision)
    }

    fun<T: Any> createOrUpdateDurableStateEx(persistenceId: String, revision: Long, entity: T) : Mono<T> {
       return durableStateDAO.createOrUpdateEx(persistenceId, revision, entity)
    }

    fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T>{
        return durableStateDAO.findByIdEx(persistenceId, revision)
    }
}


R2DBC를 레파지토리화 하느라 코드가 장황했지만 최초 복원용을 위한 Read ,  마지막 상태값 저장(persist)를 위해

CreateOrUpdata 저장 함수만 이용하며 Value객체의 자동 직렬화/역직렬화를 위해 자바객체를 모두 받을수 있는 템플릿 형태로 구현되었습니다.

  • fun <T> findByIdEx(persistenceId: String, revision: Long): Mono<T>
  • fun <T: Any> createOrUpdateEx(persistenceId: String, revision: Long, entity: T): Mono<T>

커스텀한 영속성이 적용한 액터모델 구현

sealed class HelloStateStoreActorCommand
data class HelloStore(val message: String, val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand()
data class GetHelloCountStore(val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand()
data class GetHelloTotalCountStore(val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand()
data class ChangeStateStore(val newHappy: HappyState) : HelloStateStoreActorCommand()
data class HelloLimitStore(val message: String, val replyTo: ActorRef<Any>) : HelloStateStoreActorCommand()
object ResetHelloCountStore : HelloStateStoreActorCommand()
object StopResetTimer : HelloStateStoreActorCommand()

/** HelloStateActor 반환할 수 있는 응답들 */
sealed class HelloStateStoreActorResponse
data class HelloResponseStore(val message: String) : HelloStateStoreActorResponse()
data class HelloCountResponseStore(val count: Int) : HelloStateStoreActorResponse()

/** 상태 정의 */
enum class HappyState {
    HAPPY, ANGRY
}

data class HelloStoreState (
    var happyState: HappyState,
    var helloCount: Int,
    var helloTotalCount: Int
)

/** HelloStateActor 클래스 */
class HelloStateStoreActor private constructor(
    private val context: ActorContext<HelloStateStoreActorCommand>,
    private val timers: TimerScheduler<HelloStateStoreActorCommand>,
    private val persistenceId : String,
    private val durableRepository: DurableRepository,
) : AbstractBehavior<HelloStateStoreActorCommand>(context) {

    companion object {
        fun create(persistenceId: String, durableRepository: DurableRepository): Behavior<HelloStateStoreActorCommand> {
            return Behaviors.withTimers { timers ->
                Behaviors.setup { context -> HelloStateStoreActor(context, timers, persistenceId, durableRepository) }
            }
        }
    }

    private lateinit var helloStoreState: HelloStoreState

    init {

        initStoreState()

        //timers.startTimerAtFixedRate(ResetHelloCountStore, Duration.ofSeconds(60))

    }

    override fun createReceive(): Receive<HelloStateStoreActorCommand> {
        return newReceiveBuilder()
            .onMessage(HelloStore::class.java, this::onHello)
            .onMessage(HelloLimitStore::class.java, this::onHelloLimit)
            .onMessage(GetHelloCountStore::class.java, this::onGetHelloCount)
            .onMessage(GetHelloTotalCountStore::class.java, this::onGetHelloTotalCount)
            .onMessage(ChangeStateStore::class.java, this::onChangeState)
            .onMessage(ResetHelloCountStore::class.java, this::onResetHelloCount)
            .onMessage(StopResetTimer::class.java) {
                timers.cancel(ResetHelloCountStore)
                Behaviors.same()
            }
            .build()
    }

    private val materializer = Materializer.createMaterializer(context.system)

    private val helloLimitSource = Source.queue<HelloLimitStore>(100, OverflowStrategy.backpressure())
        .throttle(3, Duration.ofSeconds(1))
        .to(Sink.foreach { cmd ->
            context.self.tell(HelloStore(cmd.message, cmd.replyTo))
        })
        .run(materializer)

    private fun onHello(command: HelloStore): Behavior<HelloStateStoreActorCommand> {
        when (helloStoreState.happyState) {
            HappyState.HAPPY -> {
                if (command.message == "Hello") {
                    helloStoreState.helloCount++
                    helloStoreState.helloTotalCount++

                    persist(helloStoreState)

                    command.replyTo.tell(HelloResponseStore("Kotlin"))
                    context.log.info("onHello-Kotlin")
                }
            }
            HappyState.ANGRY -> {
                command.replyTo.tell(HelloResponseStore("Don't talk to me!"))
            }
        }
        return this
    }

    private fun onHelloLimit(command: HelloLimitStore): Behavior<HelloStateStoreActorCommand> {
        helloLimitSource.offer(command).thenAccept { result ->
            when (result) {
                is QueueOfferResult.`Enqueued$` -> context.log.info("Command enqueued successfully")
                is QueueOfferResult.`Dropped$` -> context.log.error("Command dropped")
                is QueueOfferResult.Failure -> context.log.error("Failed to enqueue command", result.cause())
                is QueueOfferResult.`QueueClosed$` -> context.log.error("Queue was closed")
            }
        }
        return this
    }

    private fun onGetHelloCount(command: GetHelloCountStore): Behavior<HelloStateStoreActorCommand> {
        command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloCount))
        context.log.info("onGetHelloCount-helloCount: ${helloStoreState.helloCount}")
        return this
    }

    private fun onGetHelloTotalCount(command: GetHelloTotalCountStore): Behavior<HelloStateStoreActorCommand> {
        command.replyTo.tell(HelloCountResponseStore(helloStoreState.helloTotalCount))
        return this
    }

    private fun onChangeState(command: ChangeStateStore): Behavior<HelloStateStoreActorCommand> {
        helloStoreState = helloStoreState.copy(
            happyState = command.newHappy
        )

        persist(helloStoreState)
        return this
    }

    private fun onResetHelloCount(command: ResetHelloCountStore): Behavior<HelloStateStoreActorCommand> {
        context.log.info("Resetting hello count")
        helloStoreState.helloCount = 0

        persist(helloStoreState)
        return this
    }

    private fun persist(newState: HelloStoreState) {
        runBlocking {
            durableRepository.createOrUpdateDurableStateEx(persistenceId, 1L, newState).awaitSingle()
        }
    }

    private fun initStoreState() {
        runBlocking {
            var result = durableRepository.findByIdEx<HelloStoreState>(persistenceId, 1L).awaitFirstOrNull()
            if(result != null) {
                helloStoreState = result
            }
            else{
                helloStoreState = HelloStoreState(HappyState.HAPPY, 0, 0)
            }
        }
    }
}
  • 액터모델내에 코르틴 블락킹 코드가 사용된점은 개선 성능튜닝 포인트이며 AkkaStream을 통해 개선될수 있습니다.
    • SpringBoot 코르틴영역과 분리되어 Akka시스템 내에서 분리 작동되고 동시성/병렬성을 제어할수 있기때문에 웹프레인워크를 블락킹 시키지는 않을것으로 보여집니다.


유닛테스트 코드

@ExtendWith(SpringExtension::class)
@DataR2dbcTest
@ActiveProfiles("test")
@ComponentScan(basePackages = ["org.example.kotlinbootreactivelabs.repositories.durable"])
class HelloStateStoreActorTest {

    @Autowired
    lateinit var durableRepository: DurableRepository

    companion object {
        private lateinit var testKit: ActorTestKit

        @BeforeAll
        @JvmStatic
        fun setup() {
            testKit = ActorTestKit.create()
        }

        @AfterAll
        @JvmStatic
        fun teardown() {
            testKit.shutdownTestKit()
        }
    }

    @Test
    fun testHelloStore() {
        val probe: TestProbe<Any> = testKit.createTestProbe()
        val helloStateStoreActor = testKit.spawn(HelloStateStoreActor.create("test-persistence-id", durableRepository))

        helloStateStoreActor.tell(ChangeStateStore(HappyState.HAPPY))
        helloStateStoreActor.tell(HelloStore("Hello", probe.ref()))
        probe.expectMessage(HelloResponseStore("Kotlin"))

    }

    @Test
    fun testChangeState() {
        val probe: TestProbe<Any> = testKit.createTestProbe()
        val helloStateStoreActor = testKit.spawn(HelloStateStoreActor.create("test-persistence-id", durableRepository))

        helloStateStoreActor.tell(ChangeStateStore(HappyState.ANGRY))
        helloStateStoreActor.tell(HelloStore("Hello", probe.ref()))
        probe.expectMessage(HelloResponseStore("Don't talk to me!"))
    }

    @Test
    fun testResetHelloCount()  {
        val probe: TestProbe<Any> = testKit.createTestProbe()
        val helloStateStoreActor = testKit.spawn(HelloStateStoreActor.create("test-persistence-id", durableRepository))

        helloStateStoreActor.tell(ChangeStateStore(HappyState.HAPPY))
        helloStateStoreActor.tell(ResetHelloCountStore)

        helloStateStoreActor.tell(GetHelloCountStore(probe.ref()))
        probe.expectMessage(HelloCountResponseStore(0))

        helloStateStoreActor.tell(HelloStore("Hello", probe.ref()))
        helloStateStoreActor.tell(HelloStore("Hello", probe.ref()))

        probe.expectMessage(HelloResponseStore("Kotlin"))
        probe.expectMessage(HelloResponseStore("Kotlin"))

        helloStateStoreActor.tell(GetHelloCountStore(probe.ref()))
        probe.expectMessage(HelloCountResponseStore(2))

    }
}

커스텀하게 구현된 액터영속성 정품보다 코드흐름보다 나이스하지는 않지만 작동코드와 최종 작동방식은 유사하게 이용할수 있습니다. 

쿨다운 버전인 Pekko를 채택할때 R2DBC와의 호환문제가 가장 걸림돌이 였으나~ 레파지토리 패턴을 구현해두면 데이터를 다루는 방식도 일관성을 유지할수 있으며

데이터베이스 종속적이지 않게 다양한 데이터 베이스를 채택할수 있는것도 장점입니다. 정품의 경우 지원하는 데이터베이스의 플러그인을 포함 리액티브 스트림의 버전을 잘 준수하는지 여부도 중요합니다.

이 부분에대해서는 Mysql진영이 거의 늦거나 분산저장은 잘 지원하지 못해 slice라는 논리적키 생성→물리적분리 라는 전략을 채택하기 어려우며

Reactive 모드에서 DB를 처음 채택한다고하면 이 진영의 지원이 활발한 Postgre Base를 권장합니다. 


이상 Pekko를 통해 알아본 영속성장치를 커스텀하게 하기 였습니다.




  • No labels