메시지가 발생할때 명령의 이벤트 목록을 생성하고 저장을 합니다.

이 계획은 이벤트만이 저장소에 추가되고 아무것도 변이되지 않는 점이며 

이를 통해 이벤트 스트림의 소비자를 완벽하게 복제하고 확장할수 있습니다. 

이벤트 소싱의 특징

이벤트 소싱을 위한 부가적인 기능

유닛테스트를 통해 살펴보는 장바구니 이벤트 소싱

namespace AkkaNetCoreTest.Actors
{
    public class PersistentActorTest : TestKitXunit
    {
        protected TestProbe probe;

        protected IActorRef persistentActor;

        public PersistentActorTest(ITestOutputHelper output) : base(output)
        {
            Setup();
        }

        public void Setup()
        {
            //여기서 관찰자는 장바구니에 담긴 상품수를 검사할수 있습니다.
            probe = this.CreateTestProbe();

            persistentActor = Sys.ActorOf(Props.Create(() => new MyPersistentActor(probe)), "persistentActor");   
        }

        //이벤트 소싱 테스트
        [Theory(DisplayName = "이벤트소싱-이벤트는 상태화되고 재생되고 복구되어야한다")]
        [InlineData(5)]
        public void Test1(int cutoffSec)
        {
            // usage
            int expectedCount = 2;

            //선택 장애 장바구니 이벤트
            Cmd cmd1 = new Cmd("장바구니를 물건을 담음+1");
            Cmd cmd2 = new Cmd("장바구니에 물건을 뺌-0");
            Cmd cmd3 = new Cmd("장바구니에 물건을 담음+1");
            Cmd cmd4 = new Cmd("장바구니에 물건을 담음+2");

            Within(TimeSpan.FromSeconds(cutoffSec), () =>
            {
                persistentActor.Tell(cmd1);
                persistentActor.Tell(cmd2);
                persistentActor.Tell(cmd3);
                persistentActor.Tell(cmd4);
                persistentActor.Tell("print"); //현재까지 액터가 가진 이벤트리스트를 재생합니다.
                Assert.Equal(expectedCount, probe.ExpectMsg<int>());

                //액터를 강제로 죽입니다.
                persistentActor.Tell(Kill.Instance);
                Task.Delay(500).Wait();

                //시스템 셧다운후,재시작 시나리오
                //액터를 다시생성하여, 액터가 가진 이벤트가 복구되는지 확인합니다.
                persistentActor = Sys.ActorOf(Props.Create(() => new MyPersistentActor(probe)), "persistentActor");
                persistentActor.Tell("print");
                Assert.Equal(expectedCount, probe.ExpectMsg<int>());

            });
        }
    }
}


이벤트 소싱을 위한 구현체

using System;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Persistence;

namespace AkkaNetCore.Actors.Study
{
    #region MessageData
    public class Shutdown { }

    //커멘드와 이벤트를 분리합니다. 커멘드는 이벤트를 발생시키는 명령이며
    //1 커멘드는 n개의 이벤트로 복제가 될수 있습니다.
    public class Cmd
    {
        public Cmd(string data)
        {
            Data = data;
        }

        public string Data { get; }
    }

    public class Evt
    {
        public Evt(string data)
        {
            Data = data;
        }

        public string Data { get; }
    }

    public class ExampleState
    {
        private readonly ImmutableList<string> _events;

        public ExampleState(ImmutableList<string> events)
        {
            _events = events;
        }

        public ExampleState() : this(ImmutableList.Create<string>())
        {
        }

        public ExampleState Updated(Evt evt)
        {
            return new ExampleState(_events.Add(evt.Data));
        }

        public ExampleState RemoveLastItem()
        {
            return new ExampleState(_events.RemoveAt(_events.Count-1));
        }

        public int Size => _events.Count;

        public override string ToString()
        {
            return string.Join(", ", _events.Reverse());
        }

        public int CountInBasket()
        {
            int Count = 0;
            //이벤트를 재생하여 실제 장바구니에 담긴 상품수를 반환한다.
            foreach(var str in _events)
            {
                if (str.Contains("담음"))
                {
                    Count++;
                }
                else if (str.Contains("뺌"))
                {
                    Count--;
                }
            }
            return Count;
        }
    }

    #endregion

    #region Actor
    public class MyPersistentActor : UntypedPersistentActor
    {
        private ExampleState _state = new ExampleState();

        protected IActorRef probe;

        public MyPersistentActor(IActorRef probe)
        {
            this.probe = probe;
        }

        private void UpdateState(Evt evt)
        {
            _state = _state.Updated(evt);
        }

        private int NumEvents => _state.Size;

        public override Recovery Recovery => new Recovery(fromSnapshot: SnapshotSelectionCriteria.None);

        protected override void OnRecover(object message)
        {
            switch (message)
            {
                case Evt evt:
                    UpdateState(evt);
                    break;
                case SnapshotOffer snapshot when snapshot.Snapshot is ExampleState:
                    _state = (ExampleState)snapshot.Snapshot;
                    break;
            }
        }

        protected override void OnCommand(object message)
        {
            switch (message)
            {
                case Cmd cmd:
                    // Command는 두개의 이벤트로 복제될수 있습니다.
                    //Persist(new Evt($"{cmd.Data}-{NumEvents}"), UpdateState);

                    Persist(new Evt($"{cmd.Data}-{NumEvents + 1}"), evt =>
                    {
                        UpdateState(evt);
                        Context.System.EventStream.Publish(evt);
                    });
                    break;
                case "snap":
                    SaveSnapshot(_state);
                    break;
                case "print":
                    //모든 이벤트를 보여준다.
                    Console.WriteLine("Try print");
                    Console.WriteLine(_state);
                    
                    //이벤트 재생시 사이즈를 알려준다.(이벤트 복구 검증용)
                    probe.Tell(_state.CountInBasket());
                    break;
                case Shutdown down:
                    probe.Tell("die");
                    Context.Stop(Self);
                    break;
            }
        }

        public override string PersistenceId { get; } = "sample-id-1";  //영속성을 위한 고유한 아이디값을 가집니다.
    }

    #endregion
}