Page History
Info |
---|
이벤트 소싱(및 샤딩) 은 커다란 웹사이트를 수십억명의 사용자 규모로 확장하는 아이디어이며 이아이디어는 매우 간답합니다. 액터가 처리가 될때 명령의 이벤트 목록을 생성하고 저장을 합니다. 이 계획은 이벤트만이 저장소에 추가되고 아무것도 변이되지 않는 점이며 이를 통해 이벤트 스트림의 소비자를 완벽하게 복제하고 확장할수 있습니다. 이벤트 소싱을 실현하기 위해서, AKKA에서 Persistence 가 사용이 됩니다. |
이벤트
...
소싱예제 ( 모든 커멘드를 기록하는 액터)
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
using System.Collections.Immutable;
using Akka.Persistence;
#region MessageData
public class Shutdown { }
public class Cmd
{
public Cmd(string data)
{
Data = data;
}
public string Data { get; }
}
public class Evt
{
public Evt(string data)
{
Data = data;
}
public string Data { get; }
}
public class ExampleState
{
private readonly ImmutableList<string> _events;
public ExampleState(ImmutableList<string> events)
{
_events = events;
}
public ExampleState() : this(ImmutableList.Create<string>())
{
}
public ExampleState Updated(Evt evt)
{
return new ExampleState(_events.Add(evt.Data));
}
public int Size => _events.Count;
public override string ToString()
{
return string.Join(", ", _events.Reverse());
}
}
#endregion
#region Actor
public class PersistentActor : UntypedPersistentActor
{
private ExampleState _state = new ExampleState();
private void UpdateState(Evt evt)
{
_state = _state.Updated(evt);
}
private int NumEvents => _state.Size;
protected override void OnRecover(object message)
{
switch (message)
{
case Evt evt:
UpdateState(evt);
break;
case SnapshotOffer snapshot when snapshot.Snapshot is ExampleState:
_state = (ExampleState)snapshot.Snapshot;
break;
}
}
protected override void OnCommand(object message)
{
switch (message)
{
case Cmd cmd:
Persist(new Evt($"{cmd.Data}-{NumEvents}"), UpdateState);
Persist(new Evt($"{cmd.Data}-{NumEvents + 1}"), evt =>
{
UpdateState(evt);
Context.System.EventStream.Publish(evt);
});
break;
case "snap":
SaveSnapshot(_state);
break;
case "print":
Console.WriteLine("Try print");
Console.WriteLine(_state);
break;
case Shutdown down:
Context.Stop(Self);
break;
}
}
public override string PersistenceId { get; } = "sample-id-1";
}
public class MyPersistentActor : UntypedPersistentActor
{
public override string PersistenceId => "my-stable-persistence-id";
protected override void OnRecover(object message)
{
// handle recovery here
}
protected override void OnCommand(object message)
{
if (message is string c)
{
Sender.Tell(c);
Persist($"evt-{c}-1", e => Sender.Tell(e));
Persist($"evt-{c}-2", e => Sender.Tell(e));
DeferAsync($"evt-{c}-3", e => Sender.Tell(e));
}
}
}
#endregion |