Page History
...
CQRS
...
영속성 유형
CQRS에서 다루는 영속장치는 영속유형은 크게 다음과 같이 분류 될수 있으며 저장장치의 특성에 따라 Databese가 목적에 맞게 채택될수 있습니다.
...
- 액터의 최신 상태만을 저장합니다4. 이벤트 이력을 저장하지 않고 현재 상태만 유지합니다.
- CRUD 기반 애플리케이션과 유사한 방식으로 동작합니다
- 상태 변경 시마다 전체 상태를 덮어씁니다.
KafkaStream을 이용한 CQRS패턴 구현지원
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
여기서는 카프카를 이용함으로 Jounal Type의 Kafka 토픽이벤트를 생성해 그것을 소비해 내부 상태프로그래밍을 통해
...
Code Block | ||
---|---|---|
| ||
data class HelloKStreamsResult( val streams: KafkaStreams, val helloKTable: KTable<String, HelloKTableState> ) fun createHelloKStreams( redisService: RedisService): HelloKStreamsResult { val props = Properties().apply { put("bootstrap.servers", "localhost:9092,localhost:9003,localhost:9004") put("application.id", "unique-hello-ktable-actor") put("default.key.serde", Serdes.String().javaClass.name) put("default.value.serde", Serdes.String().javaClass.name) put("processing.guarantee", "exactly_once") // Ensure exactly-once processing put("session.timeout.ms", "30000") // Increase session timeout put("heartbeat.interval.ms", "10000") // Adjust heartbeat interval put("group.instance.id", "unique-instance-id") // Enable static membership put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name) put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name) put(ProducerConfig.ACKS_CONFIG, "all") // Ensure all replicas acknowledge put(ProducerConfig.RETRIES_CONFIG, 3) // Retry up to 3 times } val builder = StreamsBuilder() val storeBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("hello-state-store"), Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer()) ) builder.addStateStore(storeBuilder) // Create a KTable from the hello-log-store topic val helloKTable = builder.table<String, HelloKTableState>( "hello-log-store", Consumed.with(Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())), Materialized.with( Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer()) ) ) // Sync the KTable to Redis syncHelloKTableToRedis(helloKTable, redisService) val streams = KafkaStreams(builder.build(), props) streams.setUncaughtExceptionHandler { exception -> // Log the exception println("Uncaught exception in Kafka Streams: ${exception.message}") // Decide on the action: SHUTDOWN_CLIENT, REPLACE_THREAD, or SHUTDOWN_APPLICATION StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT } return HelloKStreamsResult(streams, helloKTable) } fun createKafkaProducer(): KafkaProducer<String, HelloKTableState> { val props = Properties().apply { put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name) put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name) } return KafkaProducer(props) } fun syncHelloKTableToRedis(helloKTable: KTable<String, HelloKTableState>, redisService: RedisService) { helloKTable.toStream().foreach { key, value -> if (value != null) { println("Syncing $key to Redis - syncHelloKTableToRedis") redisService.setValue("hello-state-store", key, ObjectMapper().writeValueAsString(value)).subscribe() } } } fun <K, V> getStateStoreWithRetries( streams: KafkaStreams, storeName: String, maxRetries: Int = 10, retryIntervalMs: Long = 1000 ): ReadOnlyKeyValueStore<K, V> { var retries = 0 while (retries < maxRetries) { try { return streams.store( StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore<K, V>()) ) } catch (e: InvalidStateStoreException) { println("State store is not yet ready, waiting...") retries++ Thread.sleep(retryIntervalMs) } } throw IllegalStateException("Could not get state store after $maxRetries retries") } |
- createHelloKStreams : hello-log-store 토픽을 소비해 KTable(내부상태 스트림 프로그래밍) 을 이용 redis에 update합니다카프카 스트림을 생성하며 외부에서 start할수 있습니다.
- syncHelloKTableToRedis : kafka의 토픽을 소비해 redis에 update하는 코드KTable Stream을 처리하는 코드이며, 여기서 다양한 도메인기능을 적용할수 있습니다.
- createKafkaProducer : topic을 생성하는 순수 생산자
- getStateStoreWithRetries : 최초 스트림시작시 StateStore가 준비되는 시간을 기다리는 시간 ( 어플리케이션 시작시 고려 )
...
KafkaStrem
Kafka에서 KTable을 이용한 KStream 은 더 복잡하고 다양한 처리를 토픽을 소비해 실시간성 처리를 할수 있으며
긴주기의 배치가 아닌 데이터가 흘러왔을때 의도된 단위로 빠르게 처리를 해 다시 흘려보낼때 KafkaStream을 활용할수 있습니다.