You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Current »

DurableState는 이벤트 발생시 변화되는 인메모리 객체의 마지막 상태를 저장하는것을 의미하며 

상태로 설계된 객체가 복원을 위해 이용될수 있으며 CRUD랑 유사하게 작동하며

Journal은 발생 이벤트를 모두 기록해 이벤트 소싱과 같이 시계열이 필요하고 이벤트 재생이 필요한곳에 이용될수 있습니다.

SnapShot은 Journal과 함께 이용되어  이벤트소스로부터 너무 많은 재생이 일어나지않게 방지를 하는 장치입니다. 


위와같은 개념을 적용해 비교적 쉬운 샘플로 구현을 시도해보겠습니다.

  • AKKA 액터모델
  • AKKA 의존없는 코틀린지원 순수 액터모델
  • 액터모델 의존 없는, Kafka Stream 활용

Actor Moel By AKKA




액터는 메시지를 실시간으로 받을수 있으며 그 메시지를 DB에 저장할수 있는 영속화(Persitent) 기능을 지원합니다.

이벤트를 먼저 설계한후 저장장치를 이후 고민할수 있는것이 액터모델의 특징이며 

다음과같은 영속성장치를 스키마모델과 함께 제공합니다.

Journal

  • 이벤트 소싱(Event Sourcing) 방식을 사용합니다
  • 액터의 상태 변경을 나타내는 이벤트들을 순차적으로 저장합니다
  • 추가 전용(append-only) 로그 형태로 이벤트를 저장합니다
  • 액터의 전체 상태 변경 이력을 보존합니다.
  • 액터 복구 시 저장된 이벤트들을 재생하여 상태를 복원합니다

Snapshot

  • 액터의 전체 상태를 특정 시점에 저장합니다
  • 복구 시간을 최적화하기 위한 용도로 사용됩니다
  • 전체 이벤트 이력을 재생하지 않고도 빠르게 상태를 복원할 수 있게 해줍니다
  • Journal과 함께 사용되며, 가장 최근 스냅샷 이후의 이벤트만 재생하면 됩니다

Durable State

  • 액터의 최신 상태만을 저장합니다4. 이벤트 이력을 저장하지 않고 현재 상태만 유지합니다.
  • CRUD 기반 애플리케이션과 유사한 방식으로 동작합니다
  • 상태 변경 시마다 전체 상태를 덮어씁니다.


주요 차이점:

  • 저장 방식: Journal은 이벤트 로그, Snapshot은 전체 상태의 특정 시점 복사본, Durable State는 최신 상태만 저장합니다.
  • 복구 프로세스: Journal은 모든 이벤트 재생, Snapshot은 최근 스냅샷 + 이후 이벤트 재생, Durable State는 최신 상태만 로드합니다.
  • 데이터 보존: Journal은 전체 이력 보존, Snapshot과 Durable State는 특정 시점/최신 상태만 보존합니다.
  • 사용 사례: Journal은 감사와 시간 기반 쿼리에 유용, Snapshot은 복구 최적화, Durable State는 단순한 상태 관리에 적합합니다.


여기서는 Durable State 영속장치를 알아보고 나머지 장치도 알아보겠습니다.

준비하기 

의존성

	// Actor Persistence
	implementation("com.typesafe.akka:akka-persistence-typed_$scalaVersion:$akkaVersion")
	implementation("com.typesafe.akka:akka-persistence-query_$scalaVersion:$akkaVersion")
	implementation("com.lightbend.akka:akka-persistence-r2dbc_$scalaVersion:$akkaR2DBC")


DDL

CREATE TABLE IF NOT EXISTS durable_state (
    slice INT NOT NULL,
    entity_type VARCHAR(255) NOT NULL,
    persistence_id VARCHAR(255) NOT NULL,
    revision BIGINT NOT NULL,
    db_timestamp timestamp with time zone NOT NULL,
    state_ser_id INTEGER NOT NULL,
    state_ser_manifest VARCHAR(255),
    state_payload BYTEA NOT NULL,
    tags TEXT ARRAY,

PRIMARY KEY(persistence_id, revision)
);

-- `durable_state_slice_idx` is only needed if the slice based queries are used
CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision);


  • https://doc.akka.io/libraries/akka-persistence-r2dbc/current/index.html 
  • 상태의 속성은 Value 객체로, state_payload 에 json형태로 저장됩니다.  저장공간을 줄이기위해 압축 바이너리로 저장도 할수 있으며 직렬화/압축 장치는 커스텀하거나 선택할수 있습니다.
  • 마지막 상태값을 주로 추적하며 revision을 통해 변경횟수를 추적할수 있으며 분산저장을 하는경우 slice를 활용할수 있습니다. 


CONFIG

akka {
  loglevel = "INFO"
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  actor {
    serializers {
      jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
      jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
    }

    serialization-bindings {
      "com.example.kotlinbootlabs.actor.PersitenceSerializable" = jackson-json
    }

    default-dispatcher {
      fork-join-executor {
        parallelism-min = 5
        parallelism-factor = 2.0
        parallelism-max = 10
      }
    }
  }

  extensions = [akka.persistence.Persistence]

  persistence {
    journal {
      plugin = "inmemory-journal"
    }

    snapshot-store {
      plugin = "inmemory-snapshot-store"
    }

    state {
      plugin = "akka.persistence.r2dbc.state"

    }

    r2dbc {
      # Durable state store
      state {
        class = "akka.persistence.r2dbc.state.R2dbcDurableStateStoreProvider"
        table = "durable_state"
        assert-single-writer = on
      }

      dialect = "postgres"
      connection-factory {
        driver = "postgres"
        host = "localhost"
        database = "postgres"
        user = "postgres"
        password = "postgres"
        max-pool-size = 20
        initial-size = 10
        max-idle-time = 30000
        # ssl {
        #   enabled = on
        #   mode = "VERIFY_CA"
        #   root-cert = "/path/db_root.crt"
        # }
      }
    }
  }
}


액터로 구현된 모델의 작동은 설정화로 도메인과 상관없는 인프라및 성능부분이 분리되어 정의되어 기본 코드는
도메인만 표현할수 있는 장점이 있긴하지만  이러한 특성으로 akka.conf 설정과 싸움을 해야할수도 있습니다. 

  • 직렬화 방식
  • 스레드풀 지정
  • 각종 커낵션 구성
  • 클러스터 구성된경우 롤성정


구현 액터모델

행복한 상태일때만 인사에 응답하고 카운팅을 하는 HeloStateActor 에 영속성만 부여한 버전입니다.

enum class State {
    HAPPY,
    ANGRY
}

sealed class HelloPersistentStateActorCommand :PersitenceSerializable
data class HelloPersistentDurable(val message: String, val replyTo: ActorRef<Any>) : HelloPersistentStateActorCommand()
data class GetHelloCountPersistentDurable(val replyTo: ActorRef<Any>) : HelloPersistentStateActorCommand()
data class GetHelloTotalCountPersitentDurable(val replyTo: ActorRef<Any>) : HelloPersistentStateActorCommand()

//val newState: State
data class ChangeState @JsonCreator constructor(
    @JsonProperty("newState")
    val newState: State,
) : HelloPersistentStateActorCommand()


object ResetHelloCount : HelloPersistentStateActorCommand()

sealed class HelloPersistentStateActorResponse
data class HelloResponse(val message: String) : HelloPersistentStateActorResponse()
data class HelloCountResponse(val count: Number) : HelloPersistentStateActorResponse()


class HelloPersistentDurableStateActor private constructor(
    private val context: ActorContext<HelloPersistentStateActorCommand>,
    private val persistenceId: PersistenceId
) : DurableStateBehavior<HelloPersistentStateActorCommand, HelloState>(persistenceId) {

    companion object {
        fun create(persistenceId: PersistenceId): Behavior<HelloPersistentStateActorCommand> {
            return Behaviors.setup { context -> HelloPersistentDurableStateActor(context, persistenceId) }
        }
    }

    override fun tag(): String {
        return "tag1"
    }

    override fun emptyState(): HelloState = HelloState(State.HAPPY, 0, 0)

    override fun commandHandler(): CommandHandler<HelloPersistentStateActorCommand, HelloState> {
        return newCommandHandlerBuilder()
            .forAnyState()
            .onCommand(HelloPersistentDurable::class.java) { state, command -> onHello(state, command) }
            .onCommand(GetHelloCountPersistentDurable::class.java) { state, command -> onGetHelloCount(state, command) }
            .onCommand(GetHelloTotalCountPersitentDurable::class.java) { state, command -> onGetHelloTotalCount(state, command) }
            .onCommand(ChangeState::class.java) { state, command -> onChangeState(state, command) }
            .onCommand(ResetHelloCount::class.java) { state, _ -> onResetHelloCount(state) }
            .build()
    }

    private fun onHello(state: HelloState, command: HelloPersistentDurable): Effect<HelloState> {
        return when (state.state) {
            State.HAPPY -> {
                if (command.message == "Hello") {
                    context.log.info("onHello-Kotlin")
                    val newState = state.copy(
                        helloCount = state.helloCount+1,
                        helloTotalCount = state.helloTotalCount + 1,
                    )

                    Effect().persist(newState).thenRun {
                        command.replyTo.tell(HelloResponse("Kotlin"))
                    }

                } else {
                    Effect().none()
                }
            }
            State.ANGRY -> {
                command.replyTo.tell(HelloResponse("Don't talk to me!"))
                Effect().none()
            }
        }
    }

    private fun onGetHelloCount(state: HelloState, command: GetHelloCountPersistentDurable): Effect<HelloState> {
        command.replyTo.tell(HelloCountResponse(state.helloCount))
        context.log.info("onGetHelloCount-helloCount: ${state.helloCount}")
        return Effect().none()
    }

    private fun onGetHelloTotalCount(state: HelloState, command: GetHelloTotalCountPersitentDurable): Effect<HelloState> {
        command.replyTo.tell(HelloCountResponse(state.helloTotalCount))
        return Effect().none()
    }

    private fun onChangeState(state: HelloState, command: ChangeState): Effect<HelloState> {
        val newState = state.copy(state = command.newState)
        return Effect().persist(newState)
    }

    private fun onResetHelloCount(state: HelloState): Effect<HelloState> {
        context.log.info("Resetting hello count")
        val newState = state.copy(helloCount = 0)
        return Effect().persist(newState)
    }
}

git : https://github.com/psmon/java-labs/tree/master/KotlinBootLabs/src/main/kotlin/com/example/kotlinbootlabs/actor/persistent


기본 액터에 추가된 개념

  • 생성자 : AbstractBehavior(기본액터) - > DurableStateBehavior 로 상속을 변경하면 되며 두번째 인자가 상태를 저장할 Type과 생성시 고유 식별자인 PersitenceID를 부여받습니다. 
  • 초기상태 : emptyState - 최초 수행시 수행되는 초기값 설정입니다.
  • 상태변경 : Effect().persist(newState) 를 통해 새로운 상태로 지정합니다.
  • 이펙트는 부수효과로~ 명령에 따라 부수효과를 줄수 있습니다. 이 사이드이펙트의 효과가 우리의 기대결과와 다르게 작동하면 디펙이 됩니다.


TEST CODE

        val probe: TestProbe<Any> = testKit.createTestProbe()
        val persistenceId = PersistenceId.ofUniqueId("HelloPersistentStateActor2")
        val helloPersistentDurableStateActor = testKit.spawn(HelloPersistentDurableStateActor.create(persistenceId))

        // Send Hello messages
        helloPersistentDurableStateActor.tell(HelloPersistentDurable("Hello", probe.ref()))
        helloPersistentDurableStateActor.tell(HelloPersistentDurable("Hello", probe.ref()))

        probe.expectMessage(HelloResponse("Kotlin"))
        probe.expectMessage(HelloResponse("Kotlin"))

        // Verify the hello count
        helloPersistentDurableStateActor.tell(GetHelloCountPersistentDurable(probe.ref()))
        probe.expectMessage(HelloCountResponse(2))
  • 이제 이 액터는 영속성기능이 부여되었기때문에 , 중단되어 다시 가동되더라도 마지막 상태를 보유한체로 작동이됩니다.



이제 액터 모델의 상태변경에 따라 저장장치를 채택해 동기화되는 유용한 기능을 탑재하게 되었습니다.


이러한 장치와 모델을 AKKA액터에서 제공하기때문에 CQRS가 더이상 이론이 아닌 유닛테스트를 해보고 우리의 장치에 실행해볼수 있습니다.

CRUD의 대표적인 연습 샘플이 게시판 만들기라고 하면CQRS의 대표적인 연습 샘플은 장바구니 만들기로 대용량의 트래픽을 처리할수 있도록

고안된 다음 장바구니 예제를 보는것을 추천합니다. ( 이벤트 소싱이 적용)



Next : 액터를 클러스터로 배치하기






Actor Model By 코틀린


앞장에서 설명한 PersistentDurableStateActor 은 상태있는 서비스에서 분산을 했을때 AKKA를 이용한 분산상태 개발방식입니다.

상태없는 개발방식과 없는 방식의 차이를 먼저알아고보 AKKA의 액터모델이 아닌, 코틀린 순수액터모델을 이용해 유사하게 구현해보고

읽기성능이 얼마나 빨라질수 있는지 확인을 해보겠습니다.


상태없는 서비스 VS 상태 있는서비스 장단점 요약

특성상태없는 서비스상태 있는 서비스
병목 원인데이터베이스, 외부 API, 캐시 사용으로 부하 증가상태 동기화, 세션 관리, 중앙 상태 저장소로 부하 증가
확장성수평 확장이 용이하며 병목을 분산 처리 가능상태 동기화 필요로 인해 확장성이 제한됨
장애 복구장애 복구가 상대적으로 단순병목 지점의 상태 복구가 복잡
최적화 방향요청 간 상태 독립성 활용, 캐싱 및 병목 지점 분산상태 동기화 최적화, 병목 지점 클러스터링
적합한 사례대규모 웹 애플리케이션, REST API, 서버리스 설계게임 서버, 채팅, 실시간 세션 기반 시스템


일반적인 웹개발에서 상태없는 서비스로 작성하는것이 이득이 있지만 , 게임또는 채팅이 진행되는동안 상태업데이트가 지속 일어나는경우

상태있는 객체를 설계해야할수도 있습니다.


간단한 게임및 비교적 간단한 채팅의 경우 상태없는 서비스로도 충분히 설계가 가능할수 있으며 꼭 어느 한가지방식만 장점이 있을수 있다고 볼수 없으며

다음과 같이 트레이드 오프가 발생할수 있습니다. 

상태관리 객체를 만드는 가장 큰 단점은 구현의 난이도에 있습

니다.

성능 트레이드오프

장점

  • 로컬 상태 관리로 요청 처리 속도가 빨라짐.
  • 네트워크 비용 감소와 높은 동시성 처리.
  • 읽기와 쓰기의 독립적 최적화.
  • 데이터베이스 성능 의존을 줄일수 있음

단점

  • 상태 동기화와 장애 복구(예: 인스턴스 장애 시 상태 손실) 복잡성 증가.
  • 상태 저장소와 메모리 사용량 증가.
  • 서버 장애 시 상태 복구를 위한 이벤트 로그 또는 스냅샷 관리 필요.


액터모델은 기본적으로 상태를 관리하며 라이프 사이클이 웹(REST API)에서 요청하는 사이클보다 긴 사이클의 상태관리를 할때 활용할수 있습니다.

시도되는 코드

  • 조건에 따른 상태저장 : 특정 사용자가 Hello를 하면 사용자별 카운트 1증가 , 기분이 나쁜 상태일때는 Hello를 거부 카운트 증가없음
  • 상태읽기 : 특정 사용자 Click 카운트를 확인 하기 위해 1000번 조회후 성능측정
  • 추가 미션 : 마지막 상태값 저장뿐만 아니라 상태 로그를 저장해 시계열분석이 가능하도록 데이터 처리


성능표

  • Total time for 100 Hello commands for Write: 73 ms
  • Average time per Hello command for Write: 0.73 ms
  • Total time for 1000 HelloCount for Read commands: 75 ms
  • Average time per HelloCount command: 0.075 ms


성능표를 먼저 공유하면~  저장시 마지막 상태는 Redis에 저장을 하고, 로그성은 Kafka에 저장합니다.

해당 객체가 상태를 가지고 있기때문에 읽기시 Redis로부터 값을 읽을 필요없이 로컬에서의 상태값값을 반환합니다.

Redis가 아무리 고성능 장치라고 하지만 1000회조회에 75ms 이내로 수행하기는 어렵습니다. 다음과 같은 결정적인 이유가 있기때문입니다.

  • Redis 를 1000번 호출한다는것은 인메모리에서 1000번 호출할것같지만  네트워크 호출 1000회가 포함되어 있습니다.
    • 상태없는 프로그래밍에서 최적화 지점은 Redis가 고성능이라고 믿고 있지만 네트워크 호출 횟수조차 줄여~ 단일지점 저장소의 부하를 어플리케이션에 분산하는것에 있습니다. 
    • Redis에서 저장용량을 줄이기위해 Json을 더 작은 바이너리 또는 Bit연산가능한 데이터로 저장할수도 있습니다.  이것은 저장공간을 줄일수 있지만 네트워크 호출 비용(Access) 을 결정적으로 줄이지는 못합니다.


이것을 모델 다이어그램으로 정리하면 다음과 같이 동작하게 됩니다.


  • Hello Count 를 질의하기위해 영속장치를 접근할 필요없이 , 인메모리(상태프로그래밍) 에서 정확한 값을 바로 응답할수 있습니다.
  • Redis에 마지막 값을 항상 유지함으로 ~ 업데이트 또는 장애복구시 Actor가 초기화될시 마지막값으로 상태를 복원해 시작할수 있습니다.
    • 마지막 상태값 유지를 위해, 꼭 Redis일 필요없으며, 인메모리 기능을 이미 가지고 있기때문에 RDB에 단지 마지막 값을 유지할수도 있습니다.  
    • 이 모델은 Read를 위해 매번 RDB 조회할 필요가 없으며 Read의 책임있는 DB를 인메모리가 아닌 다른곳에 위임할수도 있습니다. 
    • 상태있는 서비스를 통해 RDB만으로 Redis의 분산처리 캐시효과 줄수 있으며 영속성장치는 Redis보다는 RDB가 더 적합합니다. 이 샘플은 Redis의 Read속도보다 더 빠른장치를 만들고 비교하기위한 샘플입니다.
    • 액터자체를 분산처리하고 관리 하는방식은 여기서 제외되었습니다. ( Akka에서 지원하는 기능 )
  • 이벤트의 변화를 Kafka에 기록해둠으로 누군가는 이것을 소비해 시계열 기반 분석 기능을 작성할수도 있습니다. ( 이벤트 소싱패턴 활용한 다양한 기능을 구현 )



위 방식이 전통적인 CRUD 보다 분명 복잡하고 고려해야할 사항들은 더 있을수 있으며 위 방식을 단지 CRUD방식으로 풀어서 비교해보겠습니다.

전통적인 RDB를 이용한 CRUD 방식

CREATE TABLE user_state (
    user_id VARCHAR(255) PRIMARY KEY,
    state ENUM('HAPPY', 'ANGRY') NOT NULL,
    hello_count BIGINT NOT NULL,
    hello_total_count BIGINT NOT NULL
);

DELIMITER //

CREATE PROCEDURE increment_hello_count(
    IN p_user_id VARCHAR(255),
    IN p_amount BIGINT
)
BEGIN
    UPDATE user_state
    SET hello_count = hello_count + p_amount
    WHERE user_id = p_user_id AND state = 'HAPPY';
END //

DELIMITER ;

DELIMITER //

CREATE PROCEDURE get_user_state(
    IN p_user_id VARCHAR(255)
)
BEGIN
    SELECT state, hello_count, hello_total_count
    FROM user_state
    WHERE user_id = p_user_id;
END //

DELIMITER ;


전통적인 DB에서 쓰기와 읽기를 분리한다고 했을때 이것이 CQRS라고 생각하면 큰 착각이다. 우선 DB의 Read성능을 높이기위해 확장하는것은 DB1개를 더 두는것이기때문에 아주 값비싼 확장방식이다.

더욱이 사용자의 1카운트를 증가하기위해 Update또는 Create만 발생하는것이 아니라~  기존 값을 확인(Read)한후 증가하기때문에 Read와 Write(Update)비용이 증가함과 동시에 동시성 처리를 위해

사용자단위로 LockFree하지 않은 방식이 사용되었습니다.

CRUD가 항상 단점이 있는것은 아니며 다음과 CQRS대비 장단점이 존재합니다.

장점: CRUD

  • 데이터 영속성: RDB는 내구성 있는 저장소를 제공하여 애플리케이션이 충돌하더라도 데이터가 손실되지 않습니다.
  • ACID 트랜잭션: RDB는 원자성, 일관성, 고립성, 내구성을 지원하여 신뢰할 수 있는 트랜잭션을 보장합니다.
  • 유연한 쿼리: SQL을 사용하여 복잡한 쿼리와 조인을 수행할 수 있어 데이터 검색 및 조작이 용이합니다.
  • 확장성: RDB는 대용량 데이터셋을 처리할 수 있으며 샤딩과 복제를 통해 수평 확장을 지원합니다.
  • 백업 및 복구: RDB는 데이터 백업 및 복구를 위한 내장 메커니즘을 가지고 있습니다.

단점: CQRS

  • 지연 시간: RDB 작업은 디스크 I/O 및 네트워크 지연을 수반하므로 메모리 내 작업에 비해 느릴 수 있습니다.
  • 동시성: 높은 동시성을 처리하는 것은 도전적일 수 있으며, 병목 현상을 피하기 위해 신중한 트랜잭션 관리가 필요합니다.
  • 복잡성: 스키마 관리, 인덱스 최적화 및 쿼리 최적화는 애플리케이션에 복잡성을 추가할 수 있습니다.
  • 오버헤드: RDB는 ACID 속성을 유지하고 데이터 무결성을 보장하기 위해 오버헤드를 도입합니다.
  • 확장성: RDB는 확장 가능하지만, 분산 메모리 내 액터 시스템의 선형 확장성을 따라가지 못할 수 있습니다.



액터를 이용해 CQRS 패턴으로 구현하기

RedisService구현

implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")


@Service
class RedisService(private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>) {

    fun setValue(category: String, key: String, value: String): Mono<Boolean> {
        val compositeKey = "$category:$key"
        return reactiveRedisTemplate.opsForValue().set(compositeKey, value)
    }

    fun getValue(category: String, key: String): Mono<String?> {
        val compositeKey = "$category:$key"
        return reactiveRedisTemplate.opsForValue().get(compositeKey)
    }
}
  • 상태값을 가져오고 저장하는 코드는 심플하며, 복잡한 관계형 DB필요없이 공통으로 Value 객체를 이용하게 됩니다.


액터구현

코틀린 언어가 지원하는 순수 액터가 이용되었으며 

class HelloKTableActor(
        private val persistenceId:String ,
        private val producer: KafkaProducer<String, HelloKTableState>,
        private val redisService: RedisService
    ) {

    private val channel = Channel<HelloKTableActorCommand>()
    private var curState: HelloKTableState

    init {

        // Read initial state from Redis
        curState = redisService.getValue("hello-state-store", persistenceId)
            .map { stateJson ->
                // Deserialize stateJson to HelloKTableState
                // Assuming you have a method to deserialize JSON to HelloKTableState
                stateJson?.let { deserializeState(it) }
            }
            .block() ?: HelloKTableState(HelloKState.HAPPY, 0, 0) // Default state if not found

  • 액터는 고유 시별ID(논리적구분 여기서는 사용자별) 를 가지며 , 초기화시 Redis로부터 마지막 상태값을 읽어옵니다.
  • producer 는 이벤트 로그를 kafka에 전송하기위해 이용되었으며 이 장치는 없어도 동작에 영향을 끼치지 않습니다. - 시계열데이터가 있기때문에 이벤트 소싱에서 이용가능


이벤트 처리기

    private fun handleHello(command: HelloKtable) {
        if (curState.state == HelloKState.HAPPY && command.message == "Hello") {
            val newState = curState.copy(helloCount = curState.helloCount + 1, helloTotalCount = curState.helloTotalCount + 1)

            curState = newState

            // Save state to Redis
            redisService.setValue("hello-state-store", persistenceId, serializeState(curState)).subscribe()

            // Update KTable with new state
            //stateStore.put(persistenceId, newState)
            producer.send(org.apache.kafka.clients.producer.ProducerRecord("hello-log-store", persistenceId, curState))

            command.replyTo.complete(HelloKStateResponse("Kotlin"))

        } else if (curState.state == HelloKState.ANGRY) {
            command.replyTo.complete(HelloKStateResponse("Don't talk to me!"))
        }
    }
  • 나의 상태가 Happy 일때만 반응하며 아닌경우 거부합니다.
  • Redis를 통해 새로운 상태를 저장합니다.
  • Kafka를 통해 로그성 데이터를 생산합니다.


헬로우 카운트 조회

    private fun handleGetHelloCount(command: GetHelloKtableCount) {
        command.replyTo.complete(HelloKStateCountResponse(curState.helloCount))
    }
  • 액터를 통한 상태프로그래밍에 의해 HelloCount 가 동기화가 되었기때문에~ Redis를 별도로 호출할 필요없이 이미 알고 있는 상태를 반환합니다.



Redis와 Kafka 유실없음 확인




상태있는 서비스는 기본적으로 CQRS기법을 사용하며, 상태없는 서비스여도 CQRS기법을 사용할수 있습니다. 이둘의 컨셉은 각각입니다.

CQRS

CQRS는 **쓰기(Command)**와 **읽기(Query)**를 서로 분리하여 독립적으로 설계 및 구현하는 패턴입니다.

장점

  1. 성능 최적화:

    • 읽기/쓰기 작업이 분리되므로, 각 작업에 특화된 데이터 저장소나 데이터 모델을 사용할 수 있어 성능이 향상됩니다.
    • 읽기 작업은 캐시, 읽기 전용 복제본 등을 활용하여 대규모 트래픽을 처리할 수 있습니다.
  2. 확장성:

    • 읽기와 쓰기가 독립적이므로, 시스템을 개별적으로 확장 가능. 예를 들어, 읽기 요청이 많으면 읽기 쪽만 확장(수평 확장) 가능.
  3. 복잡한 도메인 로직 지원:

    • 쓰기 모델에서 복잡한 비즈니스 로직을 구현할 수 있고, 읽기 모델은 단순히 데이터를 클라이언트가 원하는 형태로 제공하도록 설계 가능.
  4. 유연한 데이터 모델링:

    • 읽기와 쓰기에 각기 다른 데이터베이스나 데이터 구조를 사용할 수 있어, 읽기 모델을 최적화하거나 도메인 이벤트를 활용하는 등 다양한 설계를 적용 가능.
  5. 도메인 이벤트 활용:

    • Event Sourcing과 결합하면 시스템에서 발생한 모든 상태 변화를 이벤트로 기록하여 시스템 상태의 과거를 재구성하거나 추적할 수 있음.

단점

  1. 복잡성 증가:

    • 읽기와 쓰기 모델을 별도로 설계하고 구현해야 하므로 개발 및 유지보수 비용이 증가함.
    • 데이터 동기화가 까다로울 수 있음(Eventual Consistency).
  2. 개발 및 테스트 비용 증가:

    • 두 가지 모델과 관련 로직을 모두 테스트해야 하므로 더 많은 시간과 리소스가 필요함.
  3. 지연된 일관성:

    • 쓰기 작업 후 읽기 모델에 반영되기까지 지연(Latency)이 있을 수 있음(Eventual Consistency 모델 적용 시).


여기서는 상태있는 서비스개발에 CQRS의 개념을 일부 채택하였습니다. 

우리의 대부분 도메인모델을 CRUD로 해결할수 있지만~ 그 한계로인해 CQRS로 해결해야는 부분이 있을수 있습니다.


AKKA가 액터모델에 CQRS를 위해 지원하는 영속 장치는 다음과같습니다.

Journal

이벤트 소싱(Event Sourcing) 방식을 사용합니다1.
액터의 상태 변경을 나타내는 이벤트들을 순차적으로 저장합니다1.
추가 전용(append-only) 로그 형태로 이벤트를 저장합니다1.
액터의 전체 상태 변경 이력을 보존합니다.
액터 복구 시 저장된 이벤트들을 재생하여 상태를 복원합니다3.

Snapshot

액터의 전체 상태를 특정 시점에 저장합니다1.
복구 시간을 최적화하기 위한 용도로 사용됩니다4.
전체 이벤트 이력을 재생하지 않고도 빠르게 상태를 복원할 수 있게 해줍니다1.
Journal과 함께 사용되며, 가장 최근 스냅샷 이후의 이벤트만 재생하면 됩니다1.

Durable State

액터의 최신 상태만을 저장합니다

이벤트 이력을 저장하지 않고 현재 상태만 유지합니다.
CRUD 기반 애플리케이션과 유사한 방식으로 동작합니다
상태 변경 시마다 전체 상태를 덮어씁니다.

주요 차이점:

  • 저장 방식: Journal은 이벤트 로그, Snapshot은 전체 상태의 특정 시점 복사본, Durable State는 최신 상태만 저장합니다.
  • 복구 프로세스: Journal은 모든 이벤트 재생, Snapshot은 최근 스냅샷 + 이후 이벤트 재생, Durable State는 최신 상태만 로드합니다.
  • 데이터 보존: Journal은 전체 이력 보존, Snapshot과 Durable State는 특정 시점/최신 상태만 보존합니다.
  • 사용 사례: Journal은 감사와 시간 기반 쿼리에 유용, Snapshot은 복구 최적화, Durable State는 단순한 상태 관리에 적합합니다.


코틀린 순수 액터모델을 사용해  Journal + durable State 컨셉을 직접 구현해 적용해보았으며

모델의 Value 변화에따른 이벤트 버전관리는 제외 되었으며 CQRS 완전한 컨셉은 PersistentDurableStateActor 를 통해 확인할수 있습니다.


전체 코드및 테스트코드

여기서 설명하는 전체코드를 확인할수 있으며 유닛테스트를 통해 기능확인및 성능테스트를 시도해볼수 있습니다.


이러한 상태관리 프로그래밍의 방식이 꼭 액터모델을 통해서 할수 있는것은 아니며 카프카의 Stream의 KTable을 통해서도 이러한 개념을 대체해 적용할수 있습니다.

상태관리 프로그래밍 방식이 왜 카프카에도 도입되고 CRUD에서 해결하기 어려운 성능문제 해결에 이용되고 있는지 학습해볼 필요는 있습니다.



영속성을 위한 이벤트 스토어 DDL for Postgres 용

상태프로그래밍 영속성 이벤트 스토어용으로 RDB는 R2DBC(Reactive Drive)를 통해 여전히 높은 동시성 처리 저장장치로 활용할수 있습니다.

샘플은 Akka Actor에 연결되는 영속장치의 공식 DDL문입니다. - akka 2.7 기준 호환됨을 확인

CREATE TABLE IF NOT EXISTS event_journal(
                                            slice INT NOT NULL,
                                            entity_type VARCHAR(255) NOT NULL,
    persistence_id VARCHAR(255) NOT NULL,
    seq_nr BIGINT NOT NULL,
    db_timestamp timestamp with time zone NOT NULL,

                               event_ser_id INTEGER NOT NULL,
                               event_ser_manifest VARCHAR(255) NOT NULL,
    event_payload BYTEA NOT NULL,

    deleted BOOLEAN DEFAULT FALSE NOT NULL,
    writer VARCHAR(255) NOT NULL,
    adapter_manifest VARCHAR(255),
    tags TEXT ARRAY,

    meta_ser_id INTEGER,
    meta_ser_manifest VARCHAR(255),
    meta_payload BYTEA,

    PRIMARY KEY(persistence_id, seq_nr)
    );

-- `event_journal_slice_idx` is only needed if the slice based queries are used
CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, entity_type, db_timestamp, seq_nr);

CREATE TABLE IF NOT EXISTS snapshot(
                                       slice INT NOT NULL,
                                       entity_type VARCHAR(255) NOT NULL,
    persistence_id VARCHAR(255) NOT NULL,
    seq_nr BIGINT NOT NULL,
    write_timestamp BIGINT NOT NULL,
    ser_id INTEGER NOT NULL,
    ser_manifest VARCHAR(255) NOT NULL,
    snapshot BYTEA NOT NULL,
    meta_ser_id INTEGER,
    meta_ser_manifest VARCHAR(255),
    meta_payload BYTEA,

    PRIMARY KEY(persistence_id)
    );

CREATE TABLE IF NOT EXISTS durable_state (
    slice INT NOT NULL,
    entity_type VARCHAR(255) NOT NULL,
    persistence_id VARCHAR(255) NOT NULL,
    revision BIGINT NOT NULL,
    db_timestamp timestamp with time zone NOT NULL,
    state_ser_id INTEGER NOT NULL,
    state_ser_manifest VARCHAR(255),
    state_payload BYTEA NOT NULL,
    tags TEXT ARRAY,

PRIMARY KEY(persistence_id, revision)
);

-- `durable_state_slice_idx` is only needed if the slice based queries are used
CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision);

-- Primitive offset types are stored in this table.
-- If only timestamp based offsets are used this table is optional.
-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table is not created.
CREATE TABLE IF NOT EXISTS projection_offset_store (
                                                       projection_name VARCHAR(255) NOT NULL,
    projection_key VARCHAR(255) NOT NULL,
    current_offset VARCHAR(255) NOT NULL,
    manifest VARCHAR(32) NOT NULL,
    mergeable BOOLEAN NOT NULL,
    last_updated BIGINT NOT NULL,
    PRIMARY KEY(projection_name, projection_key)
    );

-- Timestamp based offsets are stored in this table.
CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
                                                                 projection_name VARCHAR(255) NOT NULL,
    projection_key VARCHAR(255) NOT NULL,
    slice INT NOT NULL,
    persistence_id VARCHAR(255) NOT NULL,
    seq_nr BIGINT NOT NULL,
    -- timestamp_offset is the db_timestamp of the original event
    timestamp_offset timestamp with time zone NOT NULL,
    -- timestamp_consumed is when the offset was stored
    -- the consumer lag is timestamp_consumed - timestamp_offset
    timestamp_consumed timestamp with time zone NOT NULL,
                                     PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
    );

CREATE TABLE IF NOT EXISTS projection_management (
                                                     projection_name VARCHAR(255) NOT NULL,
    projection_key VARCHAR(255) NOT NULL,
    paused BOOLEAN NOT NULL,
    last_updated BIGINT NOT NULL,
    PRIMARY KEY(projection_name, projection_key)
    );


카프카에서 도입된 스트림을 통한 상태관리 프로그래밍 예

대량데이터 배치처리가 아닌 준실시간성 이벤트 처리를 위한 액터 상태머신 샘플


상태프로그래밍 / CQRS /  영속성  각각 다른 컨셉으로  복잡한 도메인을 처리하기위한 CQRS의 기법설명은 여기서 제외되었으며 정밀기법은 다음을 참고합니다.


Next

이와같이 구현된 컨셉을, 액터모델없이 Kafka제공 순수 스트림프로그래밍을 통해 유사하게 구현해보겠습니다.

액터모델의 경우 OOP베이스에 함수형이 필요하면 혼합할수 있는 하이브리드 방식이라고 하면 스트리밍 프로그래밍은 함수형을 주로이용합니다.







Kafka Stream


Kafka스트림을 이용해  DutableState(내구성있는 상태 서비스) 개념을 액터모델프로그래밍이 아닌 방식으로 유사하게 구현해보겠습니다.

컨셉과 다른방식으로 작성된 버전은 아래 참고


CQRS 영속성 유형

CQRS에서 다루는 영속유형은 크게 다음과 같이 분류 될수 있으며 저장장치의 특성에 따라 Databese가 목적에 맞게 채택될수 있습니다.

Journal

  • 이벤트 소싱(Event Sourcing) 방식을 사용합니다
  • 액터의 상태 변경을 나타내는 이벤트들을 순차적으로 저장합니다
  • 추가 전용(append-only) 로그 형태로 이벤트를 저장합니다
  • 액터의 전체 상태 변경 이력을 보존합니다.
  • 액터 복구 시 저장된 이벤트들을 재생하여 상태를 복원합니다

Snapshot

  • 액터의 전체 상태를 특정 시점에 저장합니다
  • 복구 시간을 최적화하기 위한 용도로 사용됩니다
  • 전체 이벤트 이력을 재생하지 않고도 빠르게 상태를 복원할 수 있게 해줍니다
  • Journal과 함께 사용되며, 가장 최근 스냅샷 이후의 이벤트만 재생하면 됩니다

Durable State

  • 액터의 최신 상태만을 저장합니다4. 이벤트 이력을 저장하지 않고 현재 상태만 유지합니다.
  • CRUD 기반 애플리케이션과 유사한 방식으로 동작합니다
  • 상태 변경 시마다 전체 상태를 덮어씁니다.



KafkaStream을 이용한 CQRS패턴 구현지원


여기서는 카프카를 이용함으로   Jounal Type의 Kafka 토픽이벤트를 생성해 그것을 소비해 내부 상태프로그래밍을 통해

Redis - Dutable State로   스트림으로 즉각변환하는 코드작성을 시도해보겠습니다.

카프카는 연속성있는 이벤트를 발생시키고 메시지를 장시간 보유할때 유리하며  - DiskBase이기때문에 많은 이벤트 보유가능

Redis는 KeyBase로 마지막 상태를 저장하는것에 유리합니다. - Memory Base이기때문에 시계열데이터보다는 , Key/Value기반 마지막 상태를 다루는것이 유리하며, 영속성이 더 중요하면 RDB 채택가능



의존모듈

	// Redis
	implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")

	// Kafka
	implementation("org.apache.kafka:kafka-streams:3.4.0")
	implementation("org.apache.kafka:kafka-clients:3.4.0")



Redis

package com.example.kotlinbootlabs.service

import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono

@Service
class RedisService(private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>) {

    fun setValue(category: String, key: String, value: String): Mono<Boolean> {
        val compositeKey = "$category:$key"
        return reactiveRedisTemplate.opsForValue().set(compositeKey, value)
    }

    fun getValue(category: String, key: String): Mono<String?> {
        val compositeKey = "$category:$key"
        return reactiveRedisTemplate.opsForValue().get(compositeKey)
    }
}
  • 다양한 state를 분류하기위해 category를 1뎁스 개념만 추가


KafkaStream

data class HelloKStreamsResult(
    val streams: KafkaStreams,
    val helloKTable: KTable<String, HelloKTableState>
)

fun createHelloKStreams( redisService: RedisService): HelloKStreamsResult {
    val props = Properties().apply {
        put("bootstrap.servers", "localhost:9092,localhost:9003,localhost:9004")
        put("application.id", "unique-hello-ktable-actor")
        put("default.key.serde", Serdes.String().javaClass.name)
        put("default.value.serde", Serdes.String().javaClass.name)
        put("processing.guarantee", "exactly_once") // Ensure exactly-once processing
        put("session.timeout.ms", "30000") // Increase session timeout
        put("heartbeat.interval.ms", "10000") // Adjust heartbeat interval
        put("group.instance.id", "unique-instance-id") // Enable static membership
        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
        put(ProducerConfig.ACKS_CONFIG, "all") // Ensure all replicas acknowledge
        put(ProducerConfig.RETRIES_CONFIG, 3) // Retry up to 3 times
    }

    val builder = StreamsBuilder()
    val storeBuilder = Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("hello-state-store"),
        Serdes.String(),
        Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
    )
    builder.addStateStore(storeBuilder)

    // Create a KTable from the hello-log-store topic
    val helloKTable = builder.table<String, HelloKTableState>(
        "hello-log-store",
        Consumed.with(Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())),
        Materialized.with(
            Serdes.String(),
            Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
        )
    )

    // Sync the KTable to Redis
    syncHelloKTableToRedis(helloKTable, redisService)

    val streams = KafkaStreams(builder.build(), props)

    streams.setUncaughtExceptionHandler { exception ->
        // Log the exception
        println("Uncaught exception in Kafka Streams: ${exception.message}")
        // Decide on the action: SHUTDOWN_CLIENT, REPLACE_THREAD, or SHUTDOWN_APPLICATION
        StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT
    }

    return HelloKStreamsResult(streams, helloKTable)
}

fun createKafkaProducer(): KafkaProducer<String, HelloKTableState> {
    val props = Properties().apply {
        put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
    }
    return KafkaProducer(props)
}

fun syncHelloKTableToRedis(helloKTable: KTable<String, HelloKTableState>, redisService: RedisService) {
    helloKTable.toStream().foreach { key, value ->
        if (value != null) {
            println("Syncing $key to Redis - syncHelloKTableToRedis")
            redisService.setValue("hello-state-store", key, ObjectMapper().writeValueAsString(value)).subscribe()
        }
    }
}

fun <K, V> getStateStoreWithRetries(
    streams: KafkaStreams,
    storeName: String,
    maxRetries: Int = 10,
    retryIntervalMs: Long = 1000
): ReadOnlyKeyValueStore<K, V> {
    var retries = 0
    while (retries < maxRetries) {
        try {
            return streams.store(
                StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore<K, V>())
            )
        } catch (e: InvalidStateStoreException) {
            println("State store is not yet ready, waiting...")
            retries++
            Thread.sleep(retryIntervalMs)
        }
    }
    throw IllegalStateException("Could not get state store after $maxRetries retries")
}
  • createHelloKStreams : 카프카 스트림을 생성하며 외부에서 start할수 있습니다.
    • syncHelloKTableToRedis : kafka의 토픽을 소비해 redis에 update하는 KTable Stream을 처리하는 코드이며, 여기서 다양한 도메인기능을 적용할수 있습니다.  
  • createKafkaProducer : topic을 생성하는 순수 생산자
  • getStateStoreWithRetries : 최초 스트림시작시 StateStore가 준비되는 시간을 기다리는 시간 ( 어플리케이션 시작시 고려 )


TEST

@ExtendWith(SpringExtension::class)
@SpringBootTest
class HelloKStreamTest {

    private lateinit var producer: KafkaProducer<String, HelloKTableState>

    private lateinit var kstreams: HelloKStreamsResult

    @Autowired
    lateinit var redisService: RedisService

    @BeforeTest
    fun setUp() {

        producer = createKafkaProducer()

        kstreams = createHelloKStreams(redisService)

        val latch = CountDownLatch(1)

        kstreams.streams.setStateListener { newState, _ ->
            if (newState == KafkaStreams.State.RUNNING) {
                latch.countDown()
            }
        }

        kstreams.streams.start()

        latch.await()

        // Wait for the state store to be ready
        val stateStore: ReadOnlyKeyValueStore<String, HelloKTableState> = getStateStoreWithRetries(
            kstreams.streams, "hello-state-store"
        )

    }

    @AfterTest
    fun tearDown() {
        if (::producer.isInitialized) {
            producer.close(Duration.ofSeconds(3))
        }

        kstreams.streams.close()
    }

    @Test
    fun testHelloCommand() {

        var testPersistId = "testid-01"

        for(i in 1L..10L) {
            val curState = HelloKTableState(HelloKState.HAPPY, i, i * 10)
            producer.send(org.apache.kafka.clients.producer.ProducerRecord("hello-log-store", testPersistId, curState))
        }

        // Wait for the state store to be ready
        Thread.sleep(5000)
    }

}
  • setUp : 어플리케이션 시작코드에서 이용될수 있으며 , 테스트 시작전 카프카 생상자/스트림 소비자를 생성합니다.
  • testHelloCommand :  동일키로 HelloState 값을 10개 발행합니다.
    • 기대결과 : 카프카 토픽데이터 연속성데이터 10개 , Redis는 최종 State 1개값


TEST RESULT

테스트 코드에서 별도의 검증은 수행되지 않았으며 툴을 통해 데이터 유실여부및 의도대로 작동되는지 확인할수 있습니다.


여기서 설명된 전체코드는 다음을통해 확인및 수행가능합니다.



KafkaStrem

Kafka에서 KTable을 이용한 KStream 은 더 복잡하고 다양한 처리를 토픽을 소비해 실시간성 처리를 할수 있으며

긴주기의 배치가 아닌 데이터가 흘러왔을때 의도된 단위로 빠르게 처리를 해 다시 흘려보낼때 KafkaStream을 활용할수 있습니다.  












  • No labels