You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Kafka스트림을 이용해  DutableState(내구성있는 상태 서비스) 개념을 액터모델프로그래밍이 아닌 방식으로 유사하게 구현해보겠습니다.

컨셉과 다른방식으로 작성된 버전은 아래 참고



CQRS 영속성장치

CQRS에서 다루는 영속장치는 크게 다음과 같이 분류 될수 있으며 저장장치의 특성에 따라 Databese가 목적에 맞게 채택될수 있습니다.

Journal

  • 이벤트 소싱(Event Sourcing) 방식을 사용합니다
  • 액터의 상태 변경을 나타내는 이벤트들을 순차적으로 저장합니다
  • 추가 전용(append-only) 로그 형태로 이벤트를 저장합니다
  • 액터의 전체 상태 변경 이력을 보존합니다.
  • 액터 복구 시 저장된 이벤트들을 재생하여 상태를 복원합니다

Snapshot

  • 액터의 전체 상태를 특정 시점에 저장합니다
  • 복구 시간을 최적화하기 위한 용도로 사용됩니다
  • 전체 이벤트 이력을 재생하지 않고도 빠르게 상태를 복원할 수 있게 해줍니다
  • Journal과 함께 사용되며, 가장 최근 스냅샷 이후의 이벤트만 재생하면 됩니다

Durable State

  • 액터의 최신 상태만을 저장합니다4. 이벤트 이력을 저장하지 않고 현재 상태만 유지합니다.
  • CRUD 기반 애플리케이션과 유사한 방식으로 동작합니다
  • 상태 변경 시마다 전체 상태를 덮어씁니다.




여기서는 카프카를 이용함으로   Jounal Type의 Kafka 토픽이벤트를 생성해 그것을 소비해 내부 상태프로그래밍을 통해

Redis - Dutable State로   스트림으로 즉각변환하는 코드작성을 시도해보겠습니다.

카프카는 연속성있는 이벤트를 발생시키고 메시지를 장시간 보유할때 유리하며  - DiskBase이기때문에 많은 이벤트 보유가능

Redis는 KeyBase로 마지막 상태를 저장하는것에 유리합니다. - Memory Base이기때문에 시계열데이터보다는 , Key/Value기반 마지막 상태를 다루는것이 유리하며, 영속성이 더 중요하면 RDB 채택가능



의존모듈

	// Redis
	implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")

	// Kafka
	implementation("org.apache.kafka:kafka-streams:3.4.0")
	implementation("org.apache.kafka:kafka-clients:3.4.0")



Redis

package com.example.kotlinbootlabs.service

import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono

@Service
class RedisService(private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>) {

    fun setValue(category: String, key: String, value: String): Mono<Boolean> {
        val compositeKey = "$category:$key"
        return reactiveRedisTemplate.opsForValue().set(compositeKey, value)
    }

    fun getValue(category: String, key: String): Mono<String?> {
        val compositeKey = "$category:$key"
        return reactiveRedisTemplate.opsForValue().get(compositeKey)
    }
}
  • 다양한 state를 분류하기위해 category를 1뎁스 개념만 추가됨


KafkaStream

data class HelloKStreamsResult(
    val streams: KafkaStreams,
    val helloKTable: KTable<String, HelloKTableState>
)

fun createHelloKStreams( redisService: RedisService): HelloKStreamsResult {
    val props = Properties().apply {
        put("bootstrap.servers", "localhost:9092,localhost:9003,localhost:9004")
        put("application.id", "unique-hello-ktable-actor")
        put("default.key.serde", Serdes.String().javaClass.name)
        put("default.value.serde", Serdes.String().javaClass.name)
        put("processing.guarantee", "exactly_once") // Ensure exactly-once processing
        put("session.timeout.ms", "30000") // Increase session timeout
        put("heartbeat.interval.ms", "10000") // Adjust heartbeat interval
        put("group.instance.id", "unique-instance-id") // Enable static membership
        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
        put(ProducerConfig.ACKS_CONFIG, "all") // Ensure all replicas acknowledge
        put(ProducerConfig.RETRIES_CONFIG, 3) // Retry up to 3 times
    }

    val builder = StreamsBuilder()
    val storeBuilder = Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("hello-state-store"),
        Serdes.String(),
        Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
    )
    builder.addStateStore(storeBuilder)

    // Create a KTable from the hello-log-store topic
    val helloKTable = builder.table<String, HelloKTableState>(
        "hello-log-store",
        Consumed.with(Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())),
        Materialized.with(
            Serdes.String(),
            Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
        )
    )

    // Sync the KTable to Redis
    syncHelloKTableToRedis(helloKTable, redisService)

    val streams = KafkaStreams(builder.build(), props)

    streams.setUncaughtExceptionHandler { exception ->
        // Log the exception
        println("Uncaught exception in Kafka Streams: ${exception.message}")
        // Decide on the action: SHUTDOWN_CLIENT, REPLACE_THREAD, or SHUTDOWN_APPLICATION
        StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT
    }

    return HelloKStreamsResult(streams, helloKTable)
}

fun createKafkaProducer(): KafkaProducer<String, HelloKTableState> {
    val props = Properties().apply {
        put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
        put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
    }
    return KafkaProducer(props)
}

fun syncHelloKTableToRedis(helloKTable: KTable<String, HelloKTableState>, redisService: RedisService) {
    helloKTable.toStream().foreach { key, value ->
        if (value != null) {
            println("Syncing $key to Redis - syncHelloKTableToRedis")
            redisService.setValue("hello-state-store", key, ObjectMapper().writeValueAsString(value)).subscribe()
        }
    }
}

fun <K, V> getStateStoreWithRetries(
    streams: KafkaStreams,
    storeName: String,
    maxRetries: Int = 10,
    retryIntervalMs: Long = 1000
): ReadOnlyKeyValueStore<K, V> {
    var retries = 0
    while (retries < maxRetries) {
        try {
            return streams.store(
                StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore<K, V>())
            )
        } catch (e: InvalidStateStoreException) {
            println("State store is not yet ready, waiting...")
            retries++
            Thread.sleep(retryIntervalMs)
        }
    }
    throw IllegalStateException("Could not get state store after $maxRetries retries")
}



TEST

@ExtendWith(SpringExtension::class)
@SpringBootTest
class HelloKStreamTest {

    private lateinit var producer: KafkaProducer<String, HelloKTableState>

    private lateinit var kstreams: HelloKStreamsResult

    @Autowired
    lateinit var redisService: RedisService

    @BeforeTest
    fun setUp() {

        producer = createKafkaProducer()

        kstreams = createHelloKStreams(redisService)

        val latch = CountDownLatch(1)

        kstreams.streams.setStateListener { newState, _ ->
            if (newState == KafkaStreams.State.RUNNING) {
                latch.countDown()
            }
        }

        kstreams.streams.start()

        latch.await()

        // Wait for the state store to be ready
        val stateStore: ReadOnlyKeyValueStore<String, HelloKTableState> = getStateStoreWithRetries(
            kstreams.streams, "hello-state-store"
        )

    }

    @AfterTest
    fun tearDown() {
        if (::producer.isInitialized) {
            producer.close(Duration.ofSeconds(3))
        }

        kstreams.streams.close()
    }

    @Test
    fun testHelloCommand() {

        var testPersistId = "testid-01"

        for(i in 1L..10L) {
            val curState = HelloKTableState(HelloKState.HAPPY, i, i * 10)
            producer.send(org.apache.kafka.clients.producer.ProducerRecord("hello-log-store", testPersistId, curState))
        }

        // Wait for the state store to be ready
        Thread.sleep(5000)
    }

}
















  • No labels