Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


CQRS

...

영속성 유형

CQRS에서 다루는 영속장치는 영속유형은 크게 다음과 같이 분류 될수 있으며 저장장치의 특성에 따라 Databese가 목적에 맞게 채택될수 있습니다.

...

  • 액터의 최신 상태만을 저장합니다4. 이벤트 이력을 저장하지 않고 현재 상태만 유지합니다.
  • CRUD 기반 애플리케이션과 유사한 방식으로 동작합니다
  • 상태 변경 시마다 전체 상태를 덮어씁니다.



KafkaStream을 이용한 CQRS패턴 구현지원

draw.io Board Diagram
bordertrue
diagramNamekakastream-state
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1080
revision1

Image Added


여기서는 카프카를 이용함으로   Jounal Type의 Kafka 토픽이벤트를 생성해 그것을 소비해 내부 상태프로그래밍을 통해

...

Code Block
themeEmacs
data class HelloKStreamsResult(
    val streams: KafkaStreams,
    val helloKTable: KTable<String, HelloKTableState>
)

fun createHelloKStreams( redisService: RedisService): HelloKStreamsResult {
    val props = Properties().apply {
        put("bootstrap.servers", "localhost:9092,localhost:9003,localhost:9004")
        put("application.id", "unique-hello-ktable-actor")
        put("default.key.serde", Serdes.String().javaClass.name)
        put("default.value.serde", Serdes.String().javaClass.name)
        put("processing.guarantee", "exactly_once") // Ensure exactly-once processing
        put("session.timeout.ms", "30000") // Increase session timeout
        put("heartbeat.interval.ms", "10000") // Adjust heartbeat interval
        put("group.instance.id", "unique-instance-id") // Enable static membership
        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
        put(ProducerConfig.ACKS_CONFIG, "all") // Ensure all replicas acknowledge
        put(ProducerConfig.RETRIES_CONFIG, 3) // Retry up to 3 times
    }

    val builder = StreamsBuilder()
    val storeBuilder = Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("hello-state-store"),
        Serdes.String(),
        Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
    )
    builder.addStateStore(storeBuilder)

    // Create a KTable from the hello-log-store topic
    val helloKTable = builder.table<String, HelloKTableState>(
        "hello-log-store",
        Consumed.with(Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())),
        Materialized.with(
            Serdes.String(),
            Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
        )
    )

    // Sync the KTable to Redis
    syncHelloKTableToRedis(helloKTable, redisService)

    val streams = KafkaStreams(builder.build(), props)

    streams.setUncaughtExceptionHandler { exception ->
        // Log the exception
        println("Uncaught exception in Kafka Streams: ${exception.message}")
        // Decide on the action: SHUTDOWN_CLIENT, REPLACE_THREAD, or SHUTDOWN_APPLICATION
        StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT
    }

    return HelloKStreamsResult(streams, helloKTable)
}

fun createKafkaProducer(): KafkaProducer<String, HelloKTableState> {
    val props = Properties().apply {
        put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
    }
    return KafkaProducer(props)
}

fun syncHelloKTableToRedis(helloKTable: KTable<String, HelloKTableState>, redisService: RedisService) {
    helloKTable.toStream().foreach { key, value ->
        if (value != null) {
            println("Syncing $key to Redis - syncHelloKTableToRedis")
            redisService.setValue("hello-state-store", key, ObjectMapper().writeValueAsString(value)).subscribe()
        }
    }
}

fun <K, V> getStateStoreWithRetries(
    streams: KafkaStreams,
    storeName: String,
    maxRetries: Int = 10,
    retryIntervalMs: Long = 1000
): ReadOnlyKeyValueStore<K, V> {
    var retries = 0
    while (retries < maxRetries) {
        try {
            return streams.store(
                StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore<K, V>())
            )
        } catch (e: InvalidStateStoreException) {
            println("State store is not yet ready, waiting...")
            retries++
            Thread.sleep(retryIntervalMs)
        }
    }
    throw IllegalStateException("Could not get state store after $maxRetries retries")
}
  • createHelloKStreams : hello-log-store 토픽을 소비해  KTable(내부상태 스트림 프로그래밍) 을 이용 redis에 update합니다카프카 스트림을 생성하며 외부에서 start할수 있습니다.
    • syncHelloKTableToRedis : kafka의 토픽을 소비해 redis에 update하는 코드KTable Stream을 처리하는 코드이며, 여기서 다양한 도메인기능을 적용할수 있습니다.  
  • createKafkaProducer : topic을 생성하는 순수 생산자
  • getStateStoreWithRetries : 최초 스트림시작시 StateStore가 준비되는 시간을 기다리는 시간 ( 어플리케이션 시작시 고려 )

...



KafkaStrem

Kafka에서 KTable을 이용한 KStream 은 더 복잡하고 다양한 처리를 토픽을 소비해 실시간성 처리를 할수 있으며

긴주기의 배치가 아닌 데이터가 흘러왔을때 의도된 단위로 빠르게 처리를 해 다시 흘려보낼때 KafkaStream을 활용할수 있습니다.  

Image Added