메시지가 발생할때 명령의 이벤트 목록을 생성하고 저장을 합니다.

이 계획은 이벤트만이 저장소에 추가되고 아무것도 변이되지 않는 점이며 

이를 통해 이벤트 스트림의 소비자를 완벽하게 복제하고 확장할수 있습니다. 

이벤트 소싱의 특징

  • 개체-관계형 임피던스 불일치가 존재하지 않습니다.
  • 신뢰할 수 있는 시스템 기록을 확보할 수 있습니다.
  • 메시지 중심(message-driven) 아키텍처에 적합합니다.
  • 저장소 규모를 확장하기 쉽습니다.
  • CRUD가 아닌 CQRS의 한종류입니다.

이벤트 소싱을 위한 부가적인 기능

  • 이벤트를 저장하고 복구하는방법
  • 이벤트를 무한하게 재생할수 없음으로, 스냅샷을 이용하는 방법

유닛테스트를 통해 살펴보는 장바구니 이벤트 소싱

namespace AkkaNetCoreTest.Actors
{
    public class PersistentActorTest : TestKitXunit
    {
        protected TestProbe probe;

        protected IActorRef persistentActor;

        public PersistentActorTest(ITestOutputHelper output) : base(output)
        {
            Setup();
        }

        public void Setup()
        {
            //여기서 관찰자는 장바구니에 담긴 상품수를 검사할수 있습니다.
            probe = this.CreateTestProbe();

            persistentActor = Sys.ActorOf(Props.Create(() => new MyPersistentActor(probe)), "persistentActor");   
        }

        //이벤트 소싱 테스트
        [Theory(DisplayName = "이벤트소싱-이벤트는 상태화되고 재생되고 복구되어야한다")]
        [InlineData(5)]
        public void Test1(int cutoffSec)
        {
            // usage
            int expectedCount = 2;

            //선택 장애 장바구니 이벤트
            Cmd cmd1 = new Cmd("장바구니를 물건을 담음+1");
            Cmd cmd2 = new Cmd("장바구니에 물건을 뺌-0");
            Cmd cmd3 = new Cmd("장바구니에 물건을 담음+1");
            Cmd cmd4 = new Cmd("장바구니에 물건을 담음+2");

            Within(TimeSpan.FromSeconds(cutoffSec), () =>
            {
                persistentActor.Tell(cmd1);
                persistentActor.Tell(cmd2);
                persistentActor.Tell(cmd3);
                persistentActor.Tell(cmd4);
                persistentActor.Tell("print"); //현재까지 액터가 가진 이벤트리스트를 재생합니다.
                Assert.Equal(expectedCount, probe.ExpectMsg<int>());

                //액터를 강제로 죽입니다.
                persistentActor.Tell(Kill.Instance);
                Task.Delay(500).Wait();

                //시스템 셧다운후,재시작 시나리오
                //액터를 다시생성하여, 액터가 가진 이벤트가 복구되는지 확인합니다.
                persistentActor = Sys.ActorOf(Props.Create(() => new MyPersistentActor(probe)), "persistentActor");
                persistentActor.Tell("print");
                Assert.Equal(expectedCount, probe.ExpectMsg<int>());

            });
        }
    }
}


이벤트 소싱을 위한 구현체

MyPersistentActor
using System;
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Persistence;

namespace AkkaNetCore.Actors.Study
{
    #region MessageData
    public class Shutdown { }

    //커멘드와 이벤트를 분리합니다. 커멘드는 이벤트를 발생시키는 명령이며
    //1 커멘드는 n개의 이벤트로 복제가 될수 있습니다.
    public class Cmd
    {
        public Cmd(string data)
        {
            Data = data;
        }

        public string Data { get; }
    }

    public class Evt
    {
        public Evt(string data)
        {
            Data = data;
        }

        public string Data { get; }
    }

    public class ExampleState
    {
        private readonly ImmutableList<string> _events;

        public ExampleState(ImmutableList<string> events)
        {
            _events = events;
        }

        public ExampleState() : this(ImmutableList.Create<string>())
        {
        }

        public ExampleState Updated(Evt evt)
        {
            return new ExampleState(_events.Add(evt.Data));
        }

        public ExampleState RemoveLastItem()
        {
            return new ExampleState(_events.RemoveAt(_events.Count-1));
        }

        public int Size => _events.Count;

        public override string ToString()
        {
            return string.Join(", ", _events.Reverse());
        }

        public int CountInBasket()
        {
            int Count = 0;
            //이벤트를 재생하여 실제 장바구니에 담긴 상품수를 반환한다.
            foreach(var str in _events)
            {
                if (str.Contains("담음"))
                {
                    Count++;
                }
                else if (str.Contains("뺌"))
                {
                    Count--;
                }
            }
            return Count;
        }
    }

    #endregion

    #region Actor
    public class MyPersistentActor : UntypedPersistentActor
    {
        private ExampleState _state = new ExampleState();

        protected IActorRef probe;

        public MyPersistentActor(IActorRef probe)
        {
            this.probe = probe;
        }

        private void UpdateState(Evt evt)
        {
            _state = _state.Updated(evt);
        }

        private int NumEvents => _state.Size;

        public override Recovery Recovery => new Recovery(fromSnapshot: SnapshotSelectionCriteria.None);

        protected override void OnRecover(object message)
        {
            switch (message)
            {
                case Evt evt:
                    UpdateState(evt);
                    break;
                case SnapshotOffer snapshot when snapshot.Snapshot is ExampleState:
                    _state = (ExampleState)snapshot.Snapshot;
                    break;
            }
        }

        protected override void OnCommand(object message)
        {
            switch (message)
            {
                case Cmd cmd:
                    // Command는 두개의 이벤트로 복제될수 있습니다.
                    //Persist(new Evt($"{cmd.Data}-{NumEvents}"), UpdateState);

                    Persist(new Evt($"{cmd.Data}-{NumEvents + 1}"), evt =>
                    {
                        UpdateState(evt);
                        Context.System.EventStream.Publish(evt);
                    });
                    break;
                case "snap":
                    SaveSnapshot(_state);
                    break;
                case "print":
                    //모든 이벤트를 보여준다.
                    Console.WriteLine("Try print");
                    Console.WriteLine(_state);
                    
                    //이벤트 재생시 사이즈를 알려준다.(이벤트 복구 검증용)
                    probe.Tell(_state.CountInBasket());
                    break;
                case Shutdown down:
                    probe.Tell("die");
                    Context.Stop(Self);
                    break;
            }
        }

        public override string PersistenceId { get; } = "sample-id-1";  //영속성을 위한 고유한 아이디값을 가집니다.
    }

    #endregion
}



  • No labels