Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

이벤트 소싱(및 샤딩) 은 커다란 웹사이트를 수십억명의 사용자 규모로 확장하는 아이디어이며

이아이디어는 매우 간답합니다. 액터가 처리가 될때 명령의 이벤트 목록을 생성하고 저장을 합니다.

이 계획은 이벤트만이 저장소에 추가되고 아무것도 변이되지 않는 점이며 

이를 통해 이벤트 스트림의 소비자를 완벽하게 복제하고 확장할수 있습니다. 

이벤트 소싱을 실현하기 위해서, AKKA에서 Persistence 가 사용이 됩니다.


이벤트

...

소싱예제 ( 모든 커멘드를 기록하는 액터)

Code Block
languagec#
themeEmacs
linenumberstrue
using System.Collections.Immutable;
using Akka.Persistence;

#region MessageData
public class Shutdown { }

public class Cmd
{
    public Cmd(string data)
    {
        Data = data;
    }

    public string Data { get; }
}

public class Evt
{
    public Evt(string data)
    {
        Data = data;
    }

    public string Data { get; }
}

public class ExampleState
{
    private readonly ImmutableList<string> _events;

    public ExampleState(ImmutableList<string> events)
    {
        _events = events;
    }

    public ExampleState() : this(ImmutableList.Create<string>())
    {
    }

    public ExampleState Updated(Evt evt)
    {
        return new ExampleState(_events.Add(evt.Data));
    }

    public int Size => _events.Count;

    public override string ToString()
    {
        return string.Join(", ", _events.Reverse());
    }
}

#endregion

#region Actor
public class PersistentActor : UntypedPersistentActor
{
    private ExampleState _state = new ExampleState();

    private void UpdateState(Evt evt)
    {
        _state = _state.Updated(evt);
    }

    private int NumEvents => _state.Size;

    protected override void OnRecover(object message)
    {
        switch (message)
        {
            case Evt evt:
                UpdateState(evt);
                break;
            case SnapshotOffer snapshot when snapshot.Snapshot is ExampleState:
            _state = (ExampleState)snapshot.Snapshot;
                break;
        }
    }

    protected override void OnCommand(object message)
    {
        switch (message)
        {
            case Cmd cmd:
                Persist(new Evt($"{cmd.Data}-{NumEvents}"), UpdateState);
                Persist(new Evt($"{cmd.Data}-{NumEvents + 1}"), evt =>
                {
                    UpdateState(evt);
                    Context.System.EventStream.Publish(evt);
                });
                break;
            case "snap":
                SaveSnapshot(_state);
                break;
            case "print":
                Console.WriteLine("Try print");
                Console.WriteLine(_state);
                break;
            case Shutdown down:
                Context.Stop(Self);
                break;
        }
    }

    public override string PersistenceId { get; } = "sample-id-1";
}

public class MyPersistentActor : UntypedPersistentActor
{
    public override string PersistenceId => "my-stable-persistence-id";

    protected override void OnRecover(object message)
    {
        // handle recovery here
    }

    protected override void OnCommand(object message)
    {
        if (message is string c)
        {
            Sender.Tell(c);
            Persist($"evt-{c}-1", e => Sender.Tell(e));
            Persist($"evt-{c}-2", e => Sender.Tell(e));
            DeferAsync($"evt-{c}-3", e => Sender.Tell(e));
        }
    }
}
#endregion