Page History
...
draw.io Board Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
- Hello Count 를 질의하기위해 영속장치를 접근할 필요없이 , 인메모리(상태프로그래밍) 에서 정확한 값을 바로 응답할수 있습니다.
- Redis에 마지막 값을 항상 유지함으로 ~ 업데이트 또는 장애복구시 Actor가 초기화될시 마지막값으로 상태를 복원해 시작할수 있습니다.
- 마지막 상태값 유지를 위해, 꼭 Redis일 필요없으며, 인메모리 기능을 이미 가지고 있기때문에 RDB에 단지 마지막 값을 유지할수도 있습니다.
- 이 모델은 Read를 위해 매번 RDB 조회할 필요가 없으며 , Read의 책임있는 DB를 인메모리가 아닌 다른곳에 위임할수도 있습니다.
- 상태있는 서비스를 통해 RDB만으로 Redis의 분산처리 캐시효과 줄수 있으며 영속성장치는 Redis보다는 RDB가 더 적합합니다. 이 샘플은 Redis의 Read속도보다 더 빠른장치를 만들고 비교하기위한 샘플입니다.
- 액터자체를 분산처리하고 관리 하는방식은 여기서 제외되었습니다. ( Akka에서 지원하는 기능 )
- 이벤트의 변화를 Kafka에 기록해둠으로 누군가는 이것을 소비해 시계열 기반 분석 기능을 작성할수도 있습니다. ( 이벤트 소싱패턴 활용한 다양한 기능을 구현 )
...
상태관리 프로그래밍 방식이 왜 카프카에도 도입되고 CRUD에서 해결하기 어려운 성능문제 해결에 이용되고 있는지 학습해볼 필요는 있습니다.
카프카에서 도입된 스트림을 통한 상태관리 프로그래밍 예
...
영속성을 위한 이벤트 스토어 DDL for Postgres 용
상태프로그래밍 영속성 이벤트 스토어용으로 RDB는 R2DBC(Reactive Drive)를 통해 여전히 높은 동시성 처리 저장장치로 활용할수 있습니다.
샘플은 Akka Actor에 연결되는 영속장치의 공식 DDL문입니다. - akka 2.7 기준 호환됨을 확인
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE IF NOT EXISTS event_journal(
slice INT NOT NULL,
entity_type VARCHAR(255) NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
db_timestamp timestamp with time zone NOT NULL,
event_ser_id INTEGER NOT NULL,
event_ser_manifest VARCHAR(255) NOT NULL,
event_payload BYTEA NOT NULL,
deleted BOOLEAN DEFAULT FALSE NOT NULL,
writer VARCHAR(255) NOT NULL,
adapter_manifest VARCHAR(255),
tags TEXT ARRAY,
meta_ser_id INTEGER,
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,
PRIMARY KEY(persistence_id, seq_nr)
);
-- `event_journal_slice_idx` is only needed if the slice based queries are used
CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, entity_type, db_timestamp, seq_nr);
CREATE TABLE IF NOT EXISTS snapshot(
slice INT NOT NULL,
entity_type VARCHAR(255) NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
write_timestamp BIGINT NOT NULL,
ser_id INTEGER NOT NULL,
ser_manifest VARCHAR(255) NOT NULL,
snapshot BYTEA NOT NULL,
meta_ser_id INTEGER,
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,
PRIMARY KEY(persistence_id)
);
CREATE TABLE IF NOT EXISTS durable_state (
slice INT NOT NULL,
entity_type VARCHAR(255) NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
revision BIGINT NOT NULL,
db_timestamp timestamp with time zone NOT NULL,
state_ser_id INTEGER NOT NULL,
state_ser_manifest VARCHAR(255),
state_payload BYTEA NOT NULL,
tags TEXT ARRAY,
PRIMARY KEY(persistence_id, revision)
);
-- `durable_state_slice_idx` is only needed if the slice based queries are used
CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision);
-- Primitive offset types are stored in this table.
-- If only timestamp based offsets are used this table is optional.
-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table is not created.
CREATE TABLE IF NOT EXISTS projection_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
current_offset VARCHAR(255) NOT NULL,
manifest VARCHAR(32) NOT NULL,
mergeable BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
-- Timestamp based offsets are stored in this table.
CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
slice INT NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
-- timestamp_offset is the db_timestamp of the original event
timestamp_offset timestamp with time zone NOT NULL,
-- timestamp_consumed is when the offset was stored
-- the consumer lag is timestamp_consumed - timestamp_offset
timestamp_consumed timestamp with time zone NOT NULL,
PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
);
CREATE TABLE IF NOT EXISTS projection_management (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
paused BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
); |
카프카에서 도입된 스트림을 통한 상태관리 프로그래밍 예
- https://velog.io/@ehdrms2034/%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%8A%A4%ED%8A%B8%EB%A6%BC%EC%A6%88-DSL-%EA%B0%9C%EB%85%90
- https://breezymind.com/kafka-streams-basic/
대량데이터 배치처리가 아닌 준실시간성 이벤트 처리를 위한 액터 상태머신 샘플
- BulkProcessorActor - TODO : 순수 코틀린 버전으로 변환 예정
상태프로그래밍 / CQRS / 영속성 각각 -%EA%B0%9C%EB%85%90
- https://breezymind.com/kafka-streams-basic/CQRS와 영속성은 다른 컨셉으로 복잡한 도메인을 처리하기위한 CQRS의 기법설명은 여기서 제외되었으며 정밀기법은 다음을 참고합니다.
Next
이와같이 구현된 컨셉을, 액터모델없이 Kafka제공 순수 스트림프로그래밍을 통해 유사하게 구현해보겠습니다.
액터모델의 경우 OOP베이스에 함수형이 필요하면 혼합할수 있는 하이브리드 방식이라고 하면 스트리밍 프로그래밍은 함수형을 주로이용합니다.