Page History
...
Code Block | ||
---|---|---|
| ||
// Redis implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive") // Kafka implementation("org.apache.kafka:kafka-streams:3.4.0") implementation("org.apache.kafka:kafka-clients:3.4.0") |
Redis
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.service
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono
@Service
class RedisService(private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>) {
fun setValue(category: String, key: String, value: String): Mono<Boolean> {
val compositeKey = "$category:$key"
return reactiveRedisTemplate.opsForValue().set(compositeKey, value)
}
fun getValue(category: String, key: String): Mono<String?> {
val compositeKey = "$category:$key"
return reactiveRedisTemplate.opsForValue().get(compositeKey)
}
} |
- 다양한 state를 분류하기위해 category를 1뎁스 개념만 추가됨
KafkaStream
Code Block | ||
---|---|---|
| ||
data class HelloKStreamsResult(
val streams: KafkaStreams,
val helloKTable: KTable<String, HelloKTableState>
)
fun createHelloKStreams( redisService: RedisService): HelloKStreamsResult {
val props = Properties().apply {
put("bootstrap.servers", "localhost:9092,localhost:9003,localhost:9004")
put("application.id", "unique-hello-ktable-actor")
put("default.key.serde", Serdes.String().javaClass.name)
put("default.value.serde", Serdes.String().javaClass.name)
put("processing.guarantee", "exactly_once") // Ensure exactly-once processing
put("session.timeout.ms", "30000") // Increase session timeout
put("heartbeat.interval.ms", "10000") // Adjust heartbeat interval
put("group.instance.id", "unique-instance-id") // Enable static membership
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
put(ProducerConfig.ACKS_CONFIG, "all") // Ensure all replicas acknowledge
put(ProducerConfig.RETRIES_CONFIG, 3) // Retry up to 3 times
}
val builder = StreamsBuilder()
val storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("hello-state-store"),
Serdes.String(),
Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
)
builder.addStateStore(storeBuilder)
// Create a KTable from the hello-log-store topic
val helloKTable = builder.table<String, HelloKTableState>(
"hello-log-store",
Consumed.with(Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())),
Materialized.with(
Serdes.String(),
Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
)
)
// Sync the KTable to Redis
syncHelloKTableToRedis(helloKTable, redisService)
val streams = KafkaStreams(builder.build(), props)
streams.setUncaughtExceptionHandler { exception ->
// Log the exception
println("Uncaught exception in Kafka Streams: ${exception.message}")
// Decide on the action: SHUTDOWN_CLIENT, REPLACE_THREAD, or SHUTDOWN_APPLICATION
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT
}
return HelloKStreamsResult(streams, helloKTable)
}
fun createKafkaProducer(): KafkaProducer<String, HelloKTableState> {
val props = Properties().apply {
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
}
return KafkaProducer(props)
}
fun syncHelloKTableToRedis(helloKTable: KTable<String, HelloKTableState>, redisService: RedisService) {
helloKTable.toStream().foreach { key, value ->
if (value != null) {
println("Syncing $key to Redis - syncHelloKTableToRedis")
redisService.setValue("hello-state-store", key, ObjectMapper().writeValueAsString(value)).subscribe()
}
}
}
fun <K, V> getStateStoreWithRetries(
streams: KafkaStreams,
storeName: String,
maxRetries: Int = 10,
retryIntervalMs: Long = 1000
): ReadOnlyKeyValueStore<K, V> {
var retries = 0
while (retries < maxRetries) {
try {
return streams.store(
StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore<K, V>())
)
} catch (e: InvalidStateStoreException) {
println("State store is not yet ready, waiting...")
retries++
Thread.sleep(retryIntervalMs)
}
}
throw IllegalStateException("Could not get state store after $maxRetries retries")
} |
TEST
Code Block | ||
---|---|---|
| ||
@ExtendWith(SpringExtension::class)
@SpringBootTest
class HelloKStreamTest {
private lateinit var producer: KafkaProducer<String, HelloKTableState>
private lateinit var kstreams: HelloKStreamsResult
@Autowired
lateinit var redisService: RedisService
@BeforeTest
fun setUp() {
producer = createKafkaProducer()
kstreams = createHelloKStreams(redisService)
val latch = CountDownLatch(1)
kstreams.streams.setStateListener { newState, _ ->
if (newState == KafkaStreams.State.RUNNING) {
latch.countDown()
}
}
kstreams.streams.start()
latch.await()
// Wait for the state store to be ready
val stateStore: ReadOnlyKeyValueStore<String, HelloKTableState> = getStateStoreWithRetries(
kstreams.streams, "hello-state-store"
)
}
@AfterTest
fun tearDown() {
if (::producer.isInitialized) {
producer.close(Duration.ofSeconds(3))
}
kstreams.streams.close()
}
@Test
fun testHelloCommand() {
var testPersistId = "testid-01"
for(i in 1L..10L) {
val curState = HelloKTableState(HelloKState.HAPPY, i, i * 10)
producer.send(org.apache.kafka.clients.producer.ProducerRecord("hello-log-store", testPersistId, curState))
}
// Wait for the state store to be ready
Thread.sleep(5000)
}
} |