Page History
...
CQRS 영속성 유형
CQRS에서 다루는 영속장치는 영속유형은 크게 다음과 같이 분류 될수 있으며 시계열 이벤트인 Jounal Type의 Kafka 토픽이벤트를 생성해 그것을 소비해 내부 상태프로그래밍을 통해Redis Dutable State로 스트림으로 즉각변환하는 코드작성을 시도해보겠습니다저장장치의 특성에 따라 Databese가 목적에 맞게 채택될수 있습니다.
Journal
- 이벤트 소싱(Event Sourcing) 방식을 사용합니다
- 액터의 상태 변경을 나타내는 이벤트들을 순차적으로 저장합니다
- 추가 전용(append-only) 로그 형태로 이벤트를 저장합니다
- 액터의 전체 상태 변경 이력을 보존합니다.
- 액터 복구 시 저장된 이벤트들을 재생하여 상태를 복원합니다
Snapshot
- 액터의 전체 상태를 특정 시점에 저장합니다
- 복구 시간을 최적화하기 위한 용도로 사용됩니다
- 전체 이벤트 이력을 재생하지 않고도 빠르게 상태를 복원할 수 있게 해줍니다
- Journal과 함께 사용되며, 가장 최근 스냅샷 이후의 이벤트만 재생하면 됩니다
Durable State
- 액터의 최신 상태만을 저장합니다4. 이벤트 이력을 저장하지 않고 현재 상태만 유지합니다.
- CRUD 기반 애플리케이션과 유사한 방식으로 동작합니다
- 상태 변경 시마다 전체 상태를 덮어씁니다.
KafkaStream을 이용한 CQRS패턴 구현지원
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
여기서는 카프카를 이용함으로 Jounal Type의 Kafka 토픽이벤트를 생성해 그것을 소비해 내부 상태프로그래밍을 통해
Redis - Dutable State로 스트림으로 즉각변환하는 코드작성을 시도해보겠습니다.
카프카는 연속성있는 이벤트를 발생시키고 메시지를 장시간 보유할때 유리하며 - DiskBase이기때문에 많은 이벤트 보유가능
Redis는 KeyBase로 마지막 상태를 저장하는것에 유리합니다. - Memory Base이기때문에 시계열데이터보다는 , Key/Value기반 마지막 상태를 다루는것이 유리하며, 영속성이 더 중요하면 RDB 채택가능
의존모듈
Code Block | ||
---|---|---|
| ||
// Redis
implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
// Kafka
implementation("org.apache.kafka:kafka-streams:3.4.0")
implementation("org.apache.kafka:kafka-clients:3.4.0") |
Redis
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.service
import org.springframework.data.redis.core.ReactiveRedisTemplate
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono
@Service
class RedisService(private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>) {
fun setValue(category: String, key: String, value: String): Mono<Boolean> {
val compositeKey = "$category:$key"
return reactiveRedisTemplate.opsForValue().set(compositeKey, value)
}
fun getValue(category: String, key: String): Mono<String?> {
val compositeKey = "$category:$key"
return reactiveRedisTemplate.opsForValue().get(compositeKey)
}
} |
- 다양한 state를 분류하기위해 category를 1뎁스 개념만 추가
KafkaStream
Code Block | ||
---|---|---|
| ||
data class HelloKStreamsResult(
val streams: KafkaStreams,
val helloKTable: KTable<String, HelloKTableState>
)
fun createHelloKStreams( redisService: RedisService): HelloKStreamsResult {
val props = Properties().apply {
put("bootstrap.servers", "localhost:9092,localhost:9003,localhost:9004")
put("application.id", "unique-hello-ktable-actor")
put("default.key.serde", Serdes.String().javaClass.name)
put("default.value.serde", Serdes.String().javaClass.name)
put("processing.guarantee", "exactly_once") // Ensure exactly-once processing
put("session.timeout.ms", "30000") // Increase session timeout
put("heartbeat.interval.ms", "10000") // Adjust heartbeat interval
put("group.instance.id", "unique-instance-id") // Enable static membership
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
put(ProducerConfig.ACKS_CONFIG, "all") // Ensure all replicas acknowledge
put(ProducerConfig.RETRIES_CONFIG, 3) // Retry up to 3 times
}
val builder = StreamsBuilder()
val storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("hello-state-store"),
Serdes.String(),
Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
)
builder.addStateStore(storeBuilder)
// Create a KTable from the hello-log-store topic
val helloKTable = builder.table<String, HelloKTableState>(
"hello-log-store",
Consumed.with(Serdes.String(), Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())),
Materialized.with(
Serdes.String(),
Serdes.serdeFrom(HelloKTableStateSerializer(), HelloKTableStateDeserializer())
)
)
// Sync the KTable to Redis
syncHelloKTableToRedis(helloKTable, redisService)
val streams = KafkaStreams(builder.build(), props)
streams.setUncaughtExceptionHandler { exception ->
// Log the exception
println("Uncaught exception in Kafka Streams: ${exception.message}")
// Decide on the action: SHUTDOWN_CLIENT, REPLACE_THREAD, or SHUTDOWN_APPLICATION
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT
}
return HelloKStreamsResult(streams, helloKTable)
}
fun createKafkaProducer(): KafkaProducer<String, HelloKTableState> {
val props = Properties().apply {
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HelloKTableStateSerializer::class.java.name)
}
return KafkaProducer(props)
}
fun syncHelloKTableToRedis(helloKTable: KTable<String, HelloKTableState>, redisService: RedisService) {
helloKTable.toStream().foreach { key, value ->
if (value != null) {
println("Syncing $key to Redis - syncHelloKTableToRedis")
redisService.setValue("hello-state-store", key, ObjectMapper().writeValueAsString(value)).subscribe()
}
}
}
fun <K, V> getStateStoreWithRetries(
streams: KafkaStreams,
storeName: String,
maxRetries: Int = 10,
retryIntervalMs: Long = 1000
): ReadOnlyKeyValueStore<K, V> {
var retries = 0
while (retries < maxRetries) {
try {
return streams.store(
StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore<K, V>())
)
} catch (e: InvalidStateStoreException) {
println("State store is not yet ready, waiting...")
retries++
Thread.sleep(retryIntervalMs)
}
}
throw IllegalStateException("Could not get state store after $maxRetries retries")
} |
- createHelloKStreams : 카프카 스트림을 생성하며 외부에서 start할수 있습니다.
- syncHelloKTableToRedis : kafka의 토픽을 소비해 redis에 update하는 KTable Stream을 처리하는 코드이며, 여기서 다양한 도메인기능을 적용할수 있습니다.
- createKafkaProducer : topic을 생성하는 순수 생산자
- getStateStoreWithRetries : 최초 스트림시작시 StateStore가 준비되는 시간을 기다리는 시간 ( 어플리케이션 시작시 고려 )
TEST
Code Block | ||
---|---|---|
| ||
@ExtendWith(SpringExtension::class)
@SpringBootTest
class HelloKStreamTest {
private lateinit var producer: KafkaProducer<String, HelloKTableState>
private lateinit var kstreams: HelloKStreamsResult
@Autowired
lateinit var redisService: RedisService
@BeforeTest
fun setUp() {
producer = createKafkaProducer()
kstreams = createHelloKStreams(redisService)
val latch = CountDownLatch(1)
kstreams.streams.setStateListener { newState, _ ->
if (newState == KafkaStreams.State.RUNNING) {
latch.countDown()
}
}
kstreams.streams.start()
latch.await()
// Wait for the state store to be ready
val stateStore: ReadOnlyKeyValueStore<String, HelloKTableState> = getStateStoreWithRetries(
kstreams.streams, "hello-state-store"
)
}
@AfterTest
fun tearDown() {
if (::producer.isInitialized) {
producer.close(Duration.ofSeconds(3))
}
kstreams.streams.close()
}
@Test
fun testHelloCommand() {
var testPersistId = "testid-01"
for(i in 1L..10L) {
val curState = HelloKTableState(HelloKState.HAPPY, i, i * 10)
producer.send(org.apache.kafka.clients.producer.ProducerRecord("hello-log-store", testPersistId, curState))
}
// Wait for the state store to be ready
Thread.sleep(5000)
}
} |
- setUp : 어플리케이션 시작코드에서 이용될수 있으며 , 테스트 시작전 카프카 생상자/스트림 소비자를 생성합니다.
- testHelloCommand : 동일키로 HelloState 값을 10개 발행합니다.
- 기대결과 : 카프카 토픽데이터 연속성데이터 10개 , Redis는 최종 State 1개값
TEST RESULT
테스트 코드에서 별도의 검증은 수행되지 않았으며 툴을 통해 데이터 유실여부및 의도대로 작동되는지 확인할수 있습니다.
여기서 설명된 전체코드는 다음을통해 확인및 수행가능합니다.
KafkaStrem
Kafka에서 KTable을 이용한 KStream 은 더 복잡하고 다양한 처리를 토픽을 소비해 실시간성 처리를 할수 있으며
긴주기의 배치가 아닌 데이터가 흘러왔을때 의도된 단위로 빠르게 처리를 해 다시 흘려보낼때 KafkaStream을 활용할수 있습니다.