메시지가 발생할때 명령의 이벤트 목록을 생성하고 저장을 합니다. 이 계획은 이벤트만이 저장소에 추가되고 아무것도 변이되지 않는 점이며 이를 통해 이벤트 스트림의 소비자를 완벽하게 복제하고 확장할수 있습니다. |
이벤트 소싱의 특징
이벤트 소싱을 위한 부가적인 기능
namespace AkkaNetCoreTest.Actors { public class PersistentActorTest : TestKitXunit { protected TestProbe probe; protected IActorRef persistentActor; public PersistentActorTest(ITestOutputHelper output) : base(output) { Setup(); } public void Setup() { //여기서 관찰자는 장바구니에 담긴 상품수를 검사할수 있습니다. probe = this.CreateTestProbe(); persistentActor = Sys.ActorOf(Props.Create(() => new MyPersistentActor(probe)), "persistentActor"); } //이벤트 소싱 테스트 [Theory(DisplayName = "이벤트소싱-이벤트는 상태화되고 재생되고 복구되어야한다")] [InlineData(5)] public void Test1(int cutoffSec) { // usage int expectedCount = 2; //선택 장애 장바구니 이벤트 Cmd cmd1 = new Cmd("장바구니를 물건을 담음+1"); Cmd cmd2 = new Cmd("장바구니에 물건을 뺌-0"); Cmd cmd3 = new Cmd("장바구니에 물건을 담음+1"); Cmd cmd4 = new Cmd("장바구니에 물건을 담음+2"); Within(TimeSpan.FromSeconds(cutoffSec), () => { persistentActor.Tell(cmd1); persistentActor.Tell(cmd2); persistentActor.Tell(cmd3); persistentActor.Tell(cmd4); persistentActor.Tell("print"); //현재까지 액터가 가진 이벤트리스트를 재생합니다. Assert.Equal(expectedCount, probe.ExpectMsg<int>()); //액터를 강제로 죽입니다. persistentActor.Tell(Kill.Instance); Task.Delay(500).Wait(); //시스템 셧다운후,재시작 시나리오 //액터를 다시생성하여, 액터가 가진 이벤트가 복구되는지 확인합니다. persistentActor = Sys.ActorOf(Props.Create(() => new MyPersistentActor(probe)), "persistentActor"); persistentActor.Tell("print"); Assert.Equal(expectedCount, probe.ExpectMsg<int>()); }); } } } |
using System; using System.Collections.Immutable; using Akka.Actor; using Akka.Persistence; namespace AkkaNetCore.Actors.Study { #region MessageData public class Shutdown { } //커멘드와 이벤트를 분리합니다. 커멘드는 이벤트를 발생시키는 명령이며 //1 커멘드는 n개의 이벤트로 복제가 될수 있습니다. public class Cmd { public Cmd(string data) { Data = data; } public string Data { get; } } public class Evt { public Evt(string data) { Data = data; } public string Data { get; } } public class ExampleState { private readonly ImmutableList<string> _events; public ExampleState(ImmutableList<string> events) { _events = events; } public ExampleState() : this(ImmutableList.Create<string>()) { } public ExampleState Updated(Evt evt) { return new ExampleState(_events.Add(evt.Data)); } public ExampleState RemoveLastItem() { return new ExampleState(_events.RemoveAt(_events.Count-1)); } public int Size => _events.Count; public override string ToString() { return string.Join(", ", _events.Reverse()); } public int CountInBasket() { int Count = 0; //이벤트를 재생하여 실제 장바구니에 담긴 상품수를 반환한다. foreach(var str in _events) { if (str.Contains("담음")) { Count++; } else if (str.Contains("뺌")) { Count--; } } return Count; } } #endregion #region Actor public class MyPersistentActor : UntypedPersistentActor { private ExampleState _state = new ExampleState(); protected IActorRef probe; public MyPersistentActor(IActorRef probe) { this.probe = probe; } private void UpdateState(Evt evt) { _state = _state.Updated(evt); } private int NumEvents => _state.Size; public override Recovery Recovery => new Recovery(fromSnapshot: SnapshotSelectionCriteria.None); protected override void OnRecover(object message) { switch (message) { case Evt evt: UpdateState(evt); break; case SnapshotOffer snapshot when snapshot.Snapshot is ExampleState: _state = (ExampleState)snapshot.Snapshot; break; } } protected override void OnCommand(object message) { switch (message) { case Cmd cmd: // Command는 두개의 이벤트로 복제될수 있습니다. //Persist(new Evt($"{cmd.Data}-{NumEvents}"), UpdateState); Persist(new Evt($"{cmd.Data}-{NumEvents + 1}"), evt => { UpdateState(evt); Context.System.EventStream.Publish(evt); }); break; case "snap": SaveSnapshot(_state); break; case "print": //모든 이벤트를 보여준다. Console.WriteLine("Try print"); Console.WriteLine(_state); //이벤트 재생시 사이즈를 알려준다.(이벤트 복구 검증용) probe.Tell(_state.CountInBasket()); break; case Shutdown down: probe.Tell("die"); Context.Stop(Self); break; } } public override string PersistenceId { get; } = "sample-id-1"; //영속성을 위한 고유한 아이디값을 가집니다. } #endregion } |