이벤트 소싱(및 샤딩) 은 커다란 웹사이트를 수십억명의 사용자 규모로 확장하는 아이디어이며
이아이디어는 매우 간답합니다. 액터가 처리가 될때 명령의 이벤트 목록을 생성하고 저장을 합니다.
이 계획은 이벤트만이 저장소에 추가되고 아무것도 변이되지 않는 점이며
이를 통해 이벤트 스트림의 소비자를 완벽하게 복제하고 확장할수 있습니다.
이벤트 소싱을 실현하기 위해서, AKKA에서 Persistence 가 사용이 됩니다.
이벤트 소싱예제 ( 모든 커멘드를 기록하는 액터)
먼저 전체 소스를 언급하고, 설계된 요소하나하나를 분석해보 겠습니다.
using System.Collections.Immutable; using Akka.Persistence; #region MessageData public class Shutdown { } //커멘드와 이벤트를 분리합니다. 커멘드는 이벤트를 발생시키는 명령이며 //1 커멘드는 n개의 이벤트로 복제가 될수 있습니다. public class Cmd { public Cmd(string data) { Data = data; } public string Data { get; } } public class Evt { public Evt(string data) { Data = data; } public string Data { get; } } public class ExampleState { private readonly ImmutableList<string> _events; public ExampleState(ImmutableList<string> events) { _events = events; } public ExampleState() : this(ImmutableList.Create<string>()) { } public ExampleState Updated(Evt evt) { return new ExampleState(_events.Add(evt.Data)); } public int Size => _events.Count; public override string ToString() { return string.Join(", ", _events.Reverse()); } } #endregion #region Actor public class PersistentActor : UntypedPersistentActor { private ExampleState _state = new ExampleState(); private void UpdateState(Evt evt) { _state = _state.Updated(evt); } private int NumEvents => _state.Size; protected override void OnRecover(object message) { switch (message) { case Evt evt: UpdateState(evt); break; case SnapshotOffer snapshot when snapshot.Snapshot is ExampleState: _state = (ExampleState)snapshot.Snapshot; break; } } protected override void OnCommand(object message) { switch (message) { case Cmd cmd: Persist(new Evt($"{cmd.Data}-{NumEvents}"), UpdateState); Persist(new Evt($"{cmd.Data}-{NumEvents + 1}"), evt => //이코드는 복제와 추가행동 전략과 관련있습니다. { UpdateState(evt); Context.System.EventStream.Publish(evt); }); break; case "snap": SaveSnapshot(_state); break; case "print": Console.WriteLine("Try print"); Console.WriteLine(_state); break; case Shutdown down: Context.Stop(Self); break; } } public override string PersistenceId { get; } = "sample-id-1"; //영속성을 위한 고유한 아이디값을 가집니다. } #endregion
Test Option
akka.persistence.max-concurrent-recoveries = 50 #복구 최고 개수
akka.actor.default-mailbox.stash-capacity = 10000
akka.persistence.internal-stash-overflow-strategy = "akka.persistence.ThrowExceptionConfigurator"
persitence와 관련하여, 위와같은 옵션을 제공합니다.
TestCode
var actorInfo = Props.Create<PersistentActor>(); var perActor = actorSystem.ActorOf(actorInfo, "myActor"); // usage Cmd cmd1 = new Cmd("장바구니를 염"); Cmd cmd2 = new Cmd("장바구니에 물건을 담음"); Cmd cmd3 = new Cmd("장바구니에 물건을 또담음"); Cmd cmd4 = new Cmd("장바구니에 처음 담은 물건을뺌"); perActor.Tell(cmd1); perActor.Tell(cmd2); perActor.Tell(cmd3); perActor.Tell(cmd4); perActor.Tell("print"); //현재까지 액터가 가진 이벤트리스트를 재생합니다.
위 설계 코드를 테스트하는 코드는 간단합니다. 우리는 어떠한 커멘드를 비 동기로 보내고,
모든 커멘드가 print에의해 , 재생할수 있는지 여부를 보는것입니다.
이 전략은 분명 전통적인 개발 방법에서 마지막 상태값을 DB에 저장하는것보다 더 유연한 기능을 제공합니다.
다음장에서는 복구전략과 연동하여 업데이트 혹은 장애발생시 어떻게 이러한 재생값을
복구하여 액터의 영속성이 보장되는지 살펴보도록 하겠습니다.