Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

✅ Akka.NET 액터모델 + Akka.Persistence 를 활용한 CQRS + 이벤트 소싱

1. 핵심 구성요소


구성요소설명
PersistentActor이벤트를 저장하고 재생할 수 있는 상태 유지 액터
Snapshotting빠른 복구를 위해 특정 시점의 상태를 저장
Event Journal모든 상태 변화(event)를 append-only 로그로 저장
Read Model Actor쿼리에 최적화된 projection을 담당
Command Handler Actor명령(Command)을 받아 이벤트로 전환 및 persistence 수행


2. CQRS(이벤트 소싱) 다이어그램 with ActorModel

Image Added

  • 이벤트가 발생할때 즉각 CRUD하여 이용하는 패턴이 아닌 액터모델의 메일박스를 이용해 상태있는 프로그래밍을 통해 이벤트소싱을 설계할수 있으며~ 이러한 느낌

...

  • 고객 주문 시스템: OrderActorPlaceOrderCommand를 받고 OrderPlacedEvent를 생성 및 저장

  • 챗봇 세션 관리: SessionActor가 메시지를 이벤트로 저장하여 상태 기반 대화 흐름 관리

  • 금융 거래 기록: AccountActorWithdrawn, Deposited 이벤트를 기록하여 완전한 거래 이력 확보



SalesActor

Code Block
themeEmacs
linenumberstrue
public class SalesActor: ReceivePersistentActor
{
    // The unique actor id
    public override string PersistenceId => "sales-actor";
    
    // The state that will be persisted in SNAPSHOTS
    private SalesActorState _state;
    
    public SalesActor(long expectedProfit, TaskCompletionSource<bool> taskCompletion)
    {
        _state = new SalesActorState
        {
            totalSales = 0
        }; 
        
        // Process a sale:
        Command<Sale>(saleInfo =>
        {
            if (_state.totalSales < expectedProfit)
            {
                // Persist an EVENT to RavenDB
                // ===========================
                
                // The handler function is executed after the EVENT was saved successfully
                Persist(saleInfo, _ =>
                {
                    // Update the latest state in the actor
                    _state.totalSales += saleInfo.Price;

                    ConsoleHelper.WriteToConsole(ConsoleColor.Black,
                        $"Sale was persisted. Phone brand: {saleInfo.Brand}. Price: {saleInfo.Price}");

                    // Store a SNAPSHOT every 5 sale events
                    // ====================================
                    
                    if (LastSequenceNr != 0 && LastSequenceNr % 5 == 0)
                    {
                        SaveSnapshot(_state.totalSales);
                    }
                });
            }
            else if (!taskCompletion.Task.IsCompleted)
            {
                Sender.Tell(new StopSimulate());
                
                ConsoleHelper.WriteToConsole(ConsoleColor.DarkMagenta,
                    $"Sale not persisted: " +
                    $"Total sales have already reached the expected profit of {expectedProfit}");
                
                ConsoleHelper.WriteToConsole(ConsoleColor.DarkMagenta,
                    _state.ToString());
                
                taskCompletion.TrySetResult(true);
            }
        });
        
        // Handle a SNAPSHOT success msg
        Command<SaveSnapshotSuccess>(success =>
        {
            ConsoleHelper.WriteToConsole(ConsoleColor.Blue,
                $"Snapshot saved successfully at sequence number {success.Metadata.SequenceNr}");
            
            // Optionally, delete old snapshots or events here if needed
            // DeleteMessages(success.Metadata.SequenceNr);
        });
        
        // Recover an EVENT
        Recover<Sale>(saleInfo =>
        {
            _state.totalSales += saleInfo.Price;
            
            ConsoleHelper.WriteToConsole(ConsoleColor.DarkGreen,
                $"Event was recovered. Price: {saleInfo.Price}");
        });
        
        // Recover a SNAPSHOT
        Recover<SnapshotOffer>(offer =>
        {
            var salesFromSnapshot = (long) offer.Snapshot;
            _state.totalSales = salesFromSnapshot;
            
            ConsoleHelper.WriteToConsole(ConsoleColor.DarkGreen,
                $"Snapshot was recovered. Total sales from snapshot: {salesFromSnapshot}");
        });
    }
}

SalesActor 클래스는 **Akka.NET의 ReceivePersistentActor**를 상속하여 **이벤트 소싱(Event Sourcing)**과 스냅샷(Snapshot) 기능을 활용해 상태를 안정적으로 유지하는 **영속 액터(Persistent Actor)**입니다. 아래에 코드 전체 흐름과 구성 요소를 설명드리겠습니다.

...

🔍 전체 구조 분석

📌 클래스 목적

  • 판매 이벤트(Sale)를 처리하고 누적된 매출을 상태로 유지

  • 특정 조건(예: 매출 목표 달성)에 도달하면 처리 중단

  • RavenDB 등을 백엔드로 사용해 이벤트를 저장

  • 일정 횟수마다 스냅샷 저장하여 복구 최적화

🧱 주요 구성 요소 분석

1. PersistenceId

Code Block
themeEmacs
public override string PersistenceId => "sales-actor";


  • 액터의 고유 식별자로 이벤트와 스냅샷을 식별하기 위한 키

  • RavenDB 또는 다른 Akka.Persistence backend에서 이 키로 데이터를 저장/조회함


2. 상태 관리 변수

Code Block
themeEmacs
private SalesActorState _state;


  • 현재 액터 상태를 저장하는 내부 객체

  • 여기서는 totalSales (누적 매출액)를 관리

3. 명령 처리 (Command<Sale>)

Code Block
themeEmacs
Command<Sale>(saleInfo => { ... });
  • 외부에서 Sale 메시지를 받으면 처리

  • 조건에 따라 이벤트를 Persist하거나 무시


주요 로직

Code Block
themeEmacs
Persist(saleInfo, _ =>
{
    _state.totalSales += saleInfo.Price;
    
    if (LastSequenceNr % 5 == 0) {
        SaveSnapshot(_state.totalSales);
    }
});
  • Persist()는 이벤트 소싱의 핵심: 이벤트를 저장한 후 상태를 변경

  • 5개의 Sale 이벤트마다 SaveSnapshot() 호출 → 빠른 복구용


4. 종료 시그널 전달

Code Block
themeEmacs
Sender.Tell(new StopSimulate());
taskCompletion.TrySetResult(true);
  • 매출 목표(expectedProfit)에 도달하면, 더 이상 이벤트를 저장하지 않고 종료 시그널 전송

  • TaskCompletionSource는 테스트나 외부 시뮬레이션 종료에 사용


5. 스냅샷 성공 처리

Code Block
themeEmacs
Command<SaveSnapshotSuccess>(success => {
    Console.WriteLine(...);
});
  • 스냅샷이 성공적으로 저장되었을 때 로깅

  • 필요시 DeleteMessages() 같은 클린업 가능


6. 복구 처리 (Recover<>)

이벤트 복구

Code Block
themeEmacs
Recover<Sale>(saleInfo => {
    _state.totalSales += saleInfo.Price;
});

스냅샷 복구

Code Block
themeEmacs
Recover<SnapshotOffer>(offer => {
    _state.totalSales = (long)offer.Snapshot;
});


가장 최근의 스냅샷부터 시작 → 남은 이벤트만 재생


✅ 핵심 특징 요약


기능설명
🎯 명령-이벤트 분리Sale은 명령(Command), Persist된 이벤트로 상태 변경
🧠 이벤트 소싱 기반 상태상태 변경 내역은 전부 이벤트로 기록되어 리플레이 가능
💾 스냅샷매 5건 이벤트마다 현재 상태 저장으로 복구 최적화
🔄 복구 내장스냅샷 → 이벤트 순으로 복원함
🚦 목표 달성시 종료외부 TaskCompletionSource로 흐름 제어



NetCoreLabs 샘플코드

여기서 실험된 작동기능은 다음 저장소에서 실행및 수행가능하며 Akka.net 기반의 액터모델을 중심으로한 다양한 기능들을 확인할수 있습니다.  

...