Page History
...
- 다양한 state를 분류하기위해 category를 1뎁스 개념만 추가됨추가
KafkaStream
Code Block | ||
---|---|---|
| ||
data class HelloKStreamsResult( val streams: KafkaStreams, val helloKTable: KTable<String, HelloKTableState> ) fun createHelloKStreams( redisService: RedisService): HelloKStreamsResult { val props = Properties().apply { put("bootstrap.servers", "localhost:9092,localhost:9003,localhost:9004") put("application.id", "unique-hello-ktable-actor") put("default.key.serde", Serdes.String().javaClass.name) put("default.value.serde", Serdes.String().javaClass.name) put("processing.guarantee", "exactly_once") // Ensure exactly-once processing put("session.timeout.ms", "30000") // Increase session timeout put("heartbeat.interval.ms", "10000") // Adjust heartbeat interval put("group.instance.id", "unique-instance-id") // Enable static membership put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name) put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name) put(ProducerConfig.ACKS_CONFIG, "all") // Ensure all replicas acknowledge put(ProducerConfig.RETRIES_CONFIG, 3) // Retry up to 3 times } val builder = StreamsBuilder() val storeBuilder = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("hello-state-store"), Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer()) ) builder.addStateStore(storeBuilder) // Create a KTable from the hello-log-store topic val helloKTable = builder.table<String, HelloKTableState>( "hello-log-store", Consumed.with(Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())), Materialized.with( Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer()) ) ) // Sync the KTable to Redis syncHelloKTableToRedis(helloKTable, redisService) val streams = KafkaStreams(builder.build(), props) streams.setUncaughtExceptionHandler { exception -> // Log the exception println("Uncaught exception in Kafka Streams: ${exception.message}") // Decide on the action: SHUTDOWN_CLIENT, REPLACE_THREAD, or SHUTDOWN_APPLICATION StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT } return HelloKStreamsResult(streams, helloKTable) } fun createKafkaProducer(): KafkaProducer<String, HelloKTableState> { val props = Properties().apply { put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name) put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name) } return KafkaProducer(props) } fun syncHelloKTableToRedis(helloKTable: KTable<String, HelloKTableState>, redisService: RedisService) { helloKTable.toStream().foreach { key, value -> if (value != null) { println("Syncing $key to Redis - syncHelloKTableToRedis") redisService.setValue("hello-state-store", key, ObjectMapper().writeValueAsString(value)).subscribe() } } } fun <K, V> getStateStoreWithRetries( streams: KafkaStreams, storeName: String, maxRetries: Int = 10, retryIntervalMs: Long = 1000 ): ReadOnlyKeyValueStore<K, V> { var retries = 0 while (retries < maxRetries) { try { return streams.store( StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore<K, V>()) ) } catch (e: InvalidStateStoreException) { println("State store is not yet ready, waiting...") retries++ Thread.sleep(retryIntervalMs) } } throw IllegalStateException("Could not get state store after $maxRetries retries") } |
...
Code Block | ||
---|---|---|
| ||
@ExtendWith(SpringExtension::class) @SpringBootTest class HelloKStreamTest { private lateinit var producer: KafkaProducer<String, HelloKTableState> private lateinit var kstreams: HelloKStreamsResult @Autowired lateinit var redisService: RedisService @BeforeTest fun setUp() { producer = createKafkaProducer() kstreams = createHelloKStreams(redisService) val latch = CountDownLatch(1) kstreams.streams.setStateListener { newState, _ -> if (newState == KafkaStreams.State.RUNNING) { latch.countDown() } } kstreams.streams.start() latch.await() // Wait for the state store to be ready val stateStore: ReadOnlyKeyValueStore<String, HelloKTableState> = getStateStoreWithRetries( kstreams.streams, "hello-state-store" ) } @AfterTest fun tearDown() { if (::producer.isInitialized) { producer.close(Duration.ofSeconds(3)) } kstreams.streams.close() } @Test fun testHelloCommand() { var testPersistId = "testid-01" for(i in 1L..10L) { val curState = HelloKTableState(HelloKState.HAPPY, i, i * 10) producer.send(org.apache.kafka.clients.producer.ProducerRecord("hello-log-store", testPersistId, curState)) } // Wait for the state store to be ready Thread.sleep(5000) } } |
- setUp : 어플리케이션 시작코드에서 이용될수 있으며 , 테스트 시작전 카프카 생상자/스트림 소비자를 생성합니다.
- testHelloCommand : 동일키로 HelloState 값을 10개 발행합니다.
- 기대결과 : 카프카 토픽데이터 연속성데이터 10개 , Redis는 최종 State 1개값
TEST RESULT
테스트 코드에서 별도의 검증은 수행되지 않았으며 툴을 통해 데이터 유실여부및 의도대로 작동되는지 확인할수 있습니다.
여기서 설명된 전체코드는 다음을통해 확인및 수행가능합니다.