Page History
...
- 지연 시간: RDB 작업은 디스크 I/O 및 네트워크 지연을 수반하므로 메모리 내 작업에 비해 느릴 수 있습니다.
- 동시성: 높은 동시성을 처리하는 것은 도전적일 수 있으며, 병목 현상을 피하기 위해 신중한 트랜잭션 관리가 필요합니다.
- 복잡성: 스키마 관리, 인덱스 최적화 및 쿼리 최적화는 애플리케이션에 복잡성을 추가할 수 있습니다.
- 오버헤드: RDB는 ACID 속성을 유지하고 데이터 무결성을 보장하기 위해 오버헤드를 도입합니다.
- 확장성: RDB는 확장 가능하지만, 분산 메모리 내 액터 시스템의 선형 확장성을 따라가지 못할 수 있습니다.
액터를 이용해 CQRS 패턴으로 구현하기
RedisService구현
Code Block | ||
---|---|---|
| ||
implementation("org.springframework.boot:spring-boot-starter-data-redis-reactive")
@Service
class RedisService(private val reactiveRedisTemplate: ReactiveRedisTemplate<String, String>) {
fun setValue(category: String, key: String, value: String): Mono<Boolean> {
val compositeKey = "$category:$key"
return reactiveRedisTemplate.opsForValue().set(compositeKey, value)
}
fun getValue(category: String, key: String): Mono<String?> {
val compositeKey = "$category:$key"
return reactiveRedisTemplate.opsForValue().get(compositeKey)
}
} |
- 상태값을 가져오고 저장하는 코드는 심플하며, 복잡한 관계형 DB필요없이 공통으로 Value 객체를 이용하게 됩니다.
액터구현
코틀린 언어가 지원하는 순수 액터가 이용되었으며
Code Block | ||
---|---|---|
| ||
class HelloKTableActor(
private val persistenceId:String ,
private val producer: KafkaProducer<String, HelloKTableState>,
private val redisService: RedisService
) {
private val channel = Channel<HelloKTableActorCommand>()
private var curState: HelloKTableState
init {
// Read initial state from Redis
curState = redisService.getValue("hello-state-store", persistenceId)
.map { stateJson ->
// Deserialize stateJson to HelloKTableState
// Assuming you have a method to deserialize JSON to HelloKTableState
stateJson?.let { deserializeState(it) }
}
.block() ?: HelloKTableState(HelloKState.HAPPY, 0, 0) // Default state if not found
|
- 액터는 고유 시별ID(논리적구분 여기서는 사용자별) 를 가지며 , 초기화시 Redis로부터 마지막 상태값을 읽어옵니다.
- producer 는 이벤트 로그를 kafka에 전송하기위해 이용되었으며 이 장치는 없어도 동작에 영향을 끼치지 않습니다. - 시계열데이터가 있기때문에 이벤트 소싱에서 이용가능
이벤트 처리기
Code Block | ||
---|---|---|
| ||
private fun handleHello(command: HelloKtable) {
if (curState.state == HelloKState.HAPPY && command.message == "Hello") {
val newState = curState.copy(helloCount = curState.helloCount + 1, helloTotalCount = curState.helloTotalCount + 1)
curState = newState
// Save state to Redis
redisService.setValue("hello-state-store", persistenceId, serializeState(curState)).subscribe()
// Update KTable with new state
//stateStore.put(persistenceId, newState)
producer.send(org.apache.kafka.clients.producer.ProducerRecord("hello-log-store", persistenceId, curState))
command.replyTo.complete(HelloKStateResponse("Kotlin"))
} else if (curState.state == HelloKState.ANGRY) {
command.replyTo.complete(HelloKStateResponse("Don't talk to me!"))
}
} |
- 나의 상태가 Happy 일때만 반응하며 아닌경우 거부합니다.
- Redis를 통해 새로운 상태를 저장합니다.
- Kafka를 통해 로그성 데이터를 생산합니다.
헬로우 카운트 조회
Code Block | ||
---|---|---|
| ||
private fun handleGetHelloCount(command: GetHelloKtableCount) {
command.replyTo.complete(HelloKStateCountResponse(curState.helloCount))
} |
- 액터를 통한 상태프로그래밍에 의해 HelloCount 가 동기화가 되었기때문에~ Redis를 별도로 호출할 필요없이 이미 알고 있는 상태를 반환합니다.
Redis와 Kafka 유실없음 확인
AKKA 가 CQRS를 위해 액터모델에 지원하는 장치는 다음과같습니다.
Journal
이벤트 소싱(Event Sourcing) 방식을 사용합니다1.
액터의 상태 변경을 나타내는 이벤트들을 순차적으로 저장합니다1.
추가 전용(append-only) 로그 형태로 이벤트를 저장합니다1.
액터의 전체 상태 변경 이력을 보존합니다.
액터 복구 시 저장된 이벤트들을 재생하여 상태를 복원합니다3.
Snapshot
액터의 전체 상태를 특정 시점에 저장합니다1.
복구 시간을 최적화하기 위한 용도로 사용됩니다4.
전체 이벤트 이력을 재생하지 않고도 빠르게 상태를 복원할 수 있게 해줍니다1.
Journal과 함께 사용되며, 가장 최근 스냅샷 이후의 이벤트만 재생하면 됩니다1.
Durable State
액터의 최신 상태만을 저장합니다
이벤트 이력을 저장하지 않고 현재 상태만 유지합니다.
CRUD 기반 애플리케이션과 유사한 방식으로 동작합니다
상태 변경 시마다 전체 상태를 덮어씁니다.
주요 차이점:
- 저장 방식: Journal은 이벤트 로그, Snapshot은 전체 상태의 특정 시점 복사본, Durable State는 최신 상태만 저장합니다.
- 복구 프로세스: Journal은 모든 이벤트 재생, Snapshot은 최근 스냅샷 + 이후 이벤트 재생, Durable State는 최신 상태만 로드합니다.
- 데이터 보존: Journal은 전체 이력 보존, Snapshot과 Durable State는 특정 시점/최신 상태만 보존합니다.
- 사용 사례: Journal은 감사와 시간 기반 쿼리에 유용, Snapshot은 복구 최적화, Durable State는 단순한 상태 관리에 적합합니다.
코틀린 순수 액터모델을 사용해 Journal + durable State 컨셉을 직접 구현해 적용되었습니다.
모델의 Value 변화에따른 이벤트 버전관리는 제외 되었으며 완전한 컨셉은 PersistentDurableStateActor 를 통해 확인할수 있습니다.
전체 코드및 테스트코드
여기서 설명하는 전체코드를 확인할수 있으며 유닛테스트를 통해 기능확인및 성능테스트를 시도해볼수 있습니다.
- https://github.com/psmon/java-labs/tree/master/KotlinBootLabs/src/main/kotlin/com/example/kotlinbootlabs/kactor
- https://github.com/psmon/java-labs/tree/master/KotlinBootLabs/src/test/kotlin/com/example/kotlinbootlabs/kactor
이러한 상태관리 프로그래밍의 방식이 꼭 액터모델을 통해서 할수 있는것은 아니며 카프카의 Stream의 KTable을 통해서도 이러한 개념을 대체해 적용할수 있습니다.
상태관리 프로그래밍 방식이 왜 카프카에도 도입되고 활용하고 있는지?
카프카에서 도입된 스트림을 통한 상태관리 프로그래밍 예
- https://velog.io/@ehdrms2034/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%8A%A4%ED%8A%B8%EB%A6%BC%EC%A6%88-DSL-%EA%B0%9C%EB%85%90
- https://breezymind.com/kafka-streams-basic/