Page History
...
✅ Akka.NET 액터모델 + Akka.Persistence 를 활용한 CQRS + 이벤트 소싱
1. 핵심 구성요소
| 구성요소 | 설명 |
|---|---|
PersistentActor | 이벤트를 저장하고 재생할 수 있는 상태 유지 액터 |
Snapshotting | 빠른 복구를 위해 특정 시점의 상태를 저장 |
Event Journal | 모든 상태 변화(event)를 append-only 로그로 저장 |
Read Model Actor | 쿼리에 최적화된 projection을 담당 |
Command Handler Actor | 명령(Command)을 받아 이벤트로 전환 및 persistence 수행 |
2. CQRS(이벤트 소싱) 다이어그램 with ActorModel
- 이벤트가 발생할때 즉각 CRUD하여 이용하는 패턴이 아닌 액터모델의 메일박스를 이용해 상태있는 프로그래밍을 통해 이벤트소싱을 설계할수 있으며~ 이러한 느낌
...
고객 주문 시스템:
OrderActor가PlaceOrderCommand를 받고OrderPlacedEvent를 생성 및 저장챗봇 세션 관리:
SessionActor가 메시지를 이벤트로 저장하여 상태 기반 대화 흐름 관리금융 거래 기록:
AccountActor가Withdrawn,Deposited이벤트를 기록하여 완전한 거래 이력 확보
SalesActor
| Code Block | ||||
|---|---|---|---|---|
| ||||
public class SalesActor: ReceivePersistentActor
{
// The unique actor id
public override string PersistenceId => "sales-actor";
// The state that will be persisted in SNAPSHOTS
private SalesActorState _state;
public SalesActor(long expectedProfit, TaskCompletionSource<bool> taskCompletion)
{
_state = new SalesActorState
{
totalSales = 0
};
// Process a sale:
Command<Sale>(saleInfo =>
{
if (_state.totalSales < expectedProfit)
{
// Persist an EVENT to RavenDB
// ===========================
// The handler function is executed after the EVENT was saved successfully
Persist(saleInfo, _ =>
{
// Update the latest state in the actor
_state.totalSales += saleInfo.Price;
ConsoleHelper.WriteToConsole(ConsoleColor.Black,
$"Sale was persisted. Phone brand: {saleInfo.Brand}. Price: {saleInfo.Price}");
// Store a SNAPSHOT every 5 sale events
// ====================================
if (LastSequenceNr != 0 && LastSequenceNr % 5 == 0)
{
SaveSnapshot(_state.totalSales);
}
});
}
else if (!taskCompletion.Task.IsCompleted)
{
Sender.Tell(new StopSimulate());
ConsoleHelper.WriteToConsole(ConsoleColor.DarkMagenta,
$"Sale not persisted: " +
$"Total sales have already reached the expected profit of {expectedProfit}");
ConsoleHelper.WriteToConsole(ConsoleColor.DarkMagenta,
_state.ToString());
taskCompletion.TrySetResult(true);
}
});
// Handle a SNAPSHOT success msg
Command<SaveSnapshotSuccess>(success =>
{
ConsoleHelper.WriteToConsole(ConsoleColor.Blue,
$"Snapshot saved successfully at sequence number {success.Metadata.SequenceNr}");
// Optionally, delete old snapshots or events here if needed
// DeleteMessages(success.Metadata.SequenceNr);
});
// Recover an EVENT
Recover<Sale>(saleInfo =>
{
_state.totalSales += saleInfo.Price;
ConsoleHelper.WriteToConsole(ConsoleColor.DarkGreen,
$"Event was recovered. Price: {saleInfo.Price}");
});
// Recover a SNAPSHOT
Recover<SnapshotOffer>(offer =>
{
var salesFromSnapshot = (long) offer.Snapshot;
_state.totalSales = salesFromSnapshot;
ConsoleHelper.WriteToConsole(ConsoleColor.DarkGreen,
$"Snapshot was recovered. Total sales from snapshot: {salesFromSnapshot}");
});
}
} |
이 SalesActor 클래스는 **Akka.NET의 ReceivePersistentActor**를 상속하여 **이벤트 소싱(Event Sourcing)**과 스냅샷(Snapshot) 기능을 활용해 상태를 안정적으로 유지하는 **영속 액터(Persistent Actor)**입니다. 아래에 코드 전체 흐름과 구성 요소를 설명드리겠습니다.
...
🔍 전체 구조 분석
📌 클래스 목적
판매 이벤트(Sale)를 처리하고 누적된 매출을 상태로 유지
특정 조건(예: 매출 목표 달성)에 도달하면 처리 중단
RavenDB 등을 백엔드로 사용해 이벤트를 저장
일정 횟수마다 스냅샷 저장하여 복구 최적화
🧱 주요 구성 요소 분석
1. PersistenceId
| Code Block | ||
|---|---|---|
| ||
public override string PersistenceId => "sales-actor"; |
액터의 고유 식별자로 이벤트와 스냅샷을 식별하기 위한 키
RavenDB 또는 다른
Akka.Persistencebackend에서 이 키로 데이터를 저장/조회함
2. 상태 관리 변수
| Code Block | ||
|---|---|---|
| ||
private SalesActorState _state; |
현재 액터 상태를 저장하는 내부 객체
여기서는
totalSales(누적 매출액)를 관리
3. 명령 처리 (Command<Sale>)
| Code Block | ||
|---|---|---|
| ||
Command<Sale>(saleInfo => { ... }); |
외부에서 Sale 메시지를 받으면 처리
조건에 따라 이벤트를 Persist하거나 무시
주요 로직
| Code Block | ||
|---|---|---|
| ||
Persist(saleInfo, _ =>
{
_state.totalSales += saleInfo.Price;
if (LastSequenceNr % 5 == 0) {
SaveSnapshot(_state.totalSales);
}
}); |
Persist()는 이벤트 소싱의 핵심: 이벤트를 저장한 후 상태를 변경5개의 Sale 이벤트마다
SaveSnapshot()호출 → 빠른 복구용
4. 종료 시그널 전달
| Code Block | ||
|---|---|---|
| ||
Sender.Tell(new StopSimulate());
taskCompletion.TrySetResult(true); |
매출 목표(
expectedProfit)에 도달하면, 더 이상 이벤트를 저장하지 않고 종료 시그널 전송TaskCompletionSource는 테스트나 외부 시뮬레이션 종료에 사용
5. 스냅샷 성공 처리
| Code Block | ||
|---|---|---|
| ||
Command<SaveSnapshotSuccess>(success => {
Console.WriteLine(...);
}); |
스냅샷이 성공적으로 저장되었을 때 로깅
필요시
DeleteMessages()같은 클린업 가능
6. 복구 처리 (Recover<>)
이벤트 복구
| Code Block | ||
|---|---|---|
| ||
Recover<Sale>(saleInfo => {
_state.totalSales += saleInfo.Price;
}); |
스냅샷 복구
| Code Block | ||
|---|---|---|
| ||
Recover<SnapshotOffer>(offer => {
_state.totalSales = (long)offer.Snapshot;
});
|
가장 최근의 스냅샷부터 시작 → 남은 이벤트만 재생
✅ 핵심 특징 요약
| 기능 | 설명 |
|---|---|
| 🎯 명령-이벤트 분리 | Sale은 명령(Command), Persist된 이벤트로 상태 변경 |
| 🧠 이벤트 소싱 기반 상태 | 상태 변경 내역은 전부 이벤트로 기록되어 리플레이 가능 |
| 💾 스냅샷 | 매 5건 이벤트마다 현재 상태 저장으로 복구 최적화 |
| 🔄 복구 내장 | 스냅샷 → 이벤트 순으로 복원함 |
| 🚦 목표 달성시 종료 | 외부 TaskCompletionSource로 흐름 제어 |
NetCoreLabs 샘플코드
여기서 실험된 작동기능은 다음 저장소에서 실행및 수행가능하며 Akka.net 기반의 액터모델을 중심으로한 다양한 기능들을 확인할수 있습니다.
...
