Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
themeEmacs
	// Redis
	implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")

	// Kafka
	implementation("org.apache.kafka:kafka-streams:3.4.0")
	implementation("org.apache.kafka:kafka-clients:3.4.0")



Redis

Code Block
themeEmacs
package com.example.kotlinbootlabs.service

import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono

@Service
class RedisService(private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>) {

    fun setValue(category: String, key: String, value: String): Mono<Boolean> {
        val compositeKey = "$category:$key"
        return reactiveRedisTemplate.opsForValue().set(compositeKey, value)
    }

    fun getValue(category: String, key: String): Mono<String?> {
        val compositeKey = "$category:$key"
        return reactiveRedisTemplate.opsForValue().get(compositeKey)
    }
}
  • 다양한 state를 분류하기위해 category를 1뎁스 개념만 추가됨


KafkaStream

Code Block
themeEmacs
data class HelloKStreamsResult(
    val streams: KafkaStreams,
    val helloKTable: KTable<String, HelloKTableState>
)

fun createHelloKStreams( redisService: RedisService): HelloKStreamsResult {
    val props = Properties().apply {
        put("bootstrap.servers", "localhost:9092,localhost:9003,localhost:9004")
        put("application.id", "unique-hello-ktable-actor")
        put("default.key.serde", Serdes.String().javaClass.name)
        put("default.value.serde", Serdes.String().javaClass.name)
        put("processing.guarantee", "exactly_once") // Ensure exactly-once processing
        put("session.timeout.ms", "30000") // Increase session timeout
        put("heartbeat.interval.ms", "10000") // Adjust heartbeat interval
        put("group.instance.id", "unique-instance-id") // Enable static membership
        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
        put(ProducerConfig.ACKS_CONFIG, "all") // Ensure all replicas acknowledge
        put(ProducerConfig.RETRIES_CONFIG, 3) // Retry up to 3 times
    }

    val builder = StreamsBuilder()
    val storeBuilder = Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("hello-state-store"),
        Serdes.String(),
        Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
    )
    builder.addStateStore(storeBuilder)

    // Create a KTable from the hello-log-store topic
    val helloKTable = builder.table<String, HelloKTableState>(
        "hello-log-store",
        Consumed.with(Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())),
        Materialized.with(
            Serdes.String(),
            Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
        )
    )

    // Sync the KTable to Redis
    syncHelloKTableToRedis(helloKTable, redisService)

    val streams = KafkaStreams(builder.build(), props)

    streams.setUncaughtExceptionHandler { exception ->
        // Log the exception
        println("Uncaught exception in Kafka Streams: ${exception.message}")
        // Decide on the action: SHUTDOWN_CLIENT, REPLACE_THREAD, or SHUTDOWN_APPLICATION
        StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT
    }

    return HelloKStreamsResult(streams, helloKTable)
}

fun createKafkaProducer(): KafkaProducer<String, HelloKTableState> {
    val props = Properties().apply {
        put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
    }
    return KafkaProducer(props)
}

fun syncHelloKTableToRedis(helloKTable: KTable<String, HelloKTableState>, redisService: RedisService) {
    helloKTable.toStream().foreach { key, value ->
        if (value != null) {
            println("Syncing $key to Redis - syncHelloKTableToRedis")
            redisService.setValue("hello-state-store", key, ObjectMapper().writeValueAsString(value)).subscribe()
        }
    }
}

fun <K, V> getStateStoreWithRetries(
    streams: KafkaStreams,
    storeName: String,
    maxRetries: Int = 10,
    retryIntervalMs: Long = 1000
): ReadOnlyKeyValueStore<K, V> {
    var retries = 0
    while (retries < maxRetries) {
        try {
            return streams.store(
                StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore<K, V>())
            )
        } catch (e: InvalidStateStoreException) {
            println("State store is not yet ready, waiting...")
            retries++
            Thread.sleep(retryIntervalMs)
        }
    }
    throw IllegalStateException("Could not get state store after $maxRetries retries")
}



TEST

Code Block
themeEmacs
@ExtendWith(SpringExtension::class)
@SpringBootTest
class HelloKStreamTest {

    private lateinit var producer: KafkaProducer<String, HelloKTableState>

    private lateinit var kstreams: HelloKStreamsResult

    @Autowired
    lateinit var redisService: RedisService

    @BeforeTest
    fun setUp() {

        producer = createKafkaProducer()

        kstreams = createHelloKStreams(redisService)

        val latch = CountDownLatch(1)

        kstreams.streams.setStateListener { newState, _ ->
            if (newState == KafkaStreams.State.RUNNING) {
                latch.countDown()
            }
        }

        kstreams.streams.start()

        latch.await()

        // Wait for the state store to be ready
        val stateStore: ReadOnlyKeyValueStore<String, HelloKTableState> = getStateStoreWithRetries(
            kstreams.streams, "hello-state-store"
        )

    }

    @AfterTest
    fun tearDown() {
        if (::producer.isInitialized) {
            producer.close(Duration.ofSeconds(3))
        }

        kstreams.streams.close()
    }

    @Test
    fun testHelloCommand() {

        var testPersistId = "testid-01"

        for(i in 1L..10L) {
            val curState = HelloKTableState(HelloKState.HAPPY, i, i * 10)
            producer.send(org.apache.kafka.clients.producer.ProducerRecord("hello-log-store", testPersistId, curState))
        }

        // Wait for the state store to be ready
        Thread.sleep(5000)
    }

}