You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

이벤트 소싱(및 샤딩) 은 커다란 웹사이트를 수십억명의 사용자 규모로 확장하는 아이디어이며

이아이디어는 매우 간답합니다. 액터가 처리가 될때 명령의 이벤트 목록을 생성하고 저장을 합니다.

이 계획은 이벤트만이 저장소에 추가되고 아무것도 변이되지 않는 점이며 

이를 통해 이벤트 스트림의 소비자를 완벽하게 복제하고 확장할수 있습니다. 

이벤트 소싱을 실현하기 위해서, AKKA에서 Persistence 가 사용이 됩니다.


이벤트 소싱예제 ( 모든 커멘드를 기록하는 액터)


먼저 전체 소스를 언급하고, 설계된 요소하나하나를 분석해보 겠습니다. 


using System.Collections.Immutable;
using Akka.Persistence;

#region MessageData
public class Shutdown { }

//커멘드와 이벤트를 분리합니다. 커멘드는 이벤트를 발생시키는 명령이며
//1 커멘드는 n개의 이벤트로 복제가 될수 있습니다.
public class Cmd
{
    public Cmd(string data)
    {
        Data = data;
    }

    public string Data { get; }
}

public class Evt
{
    public Evt(string data)
    {
        Data = data;
    }

    public string Data { get; }
}

public class ExampleState
{
    private readonly ImmutableList<string> _events;

    public ExampleState(ImmutableList<string> events)
    {
        _events = events;
    }

    public ExampleState() : this(ImmutableList.Create<string>())
    {
    }

    public ExampleState Updated(Evt evt)
    {
        return new ExampleState(_events.Add(evt.Data));
    }

    public int Size => _events.Count;

    public override string ToString()
    {
        return string.Join(", ", _events.Reverse());
    }
}

#endregion

#region Actor
public class PersistentActor : UntypedPersistentActor
{
    private ExampleState _state = new ExampleState();

    private void UpdateState(Evt evt)
    {
        _state = _state.Updated(evt);
    }

    private int NumEvents => _state.Size;

    public override Recovery Recovery => new Recovery(fromSnapshot: SnapshotSelectionCriteria.None);
    protected override void OnRecover(object message)
    {
        switch (message)
        {
            case Evt evt:
                UpdateState(evt);
                break;
            case SnapshotOffer snapshot when snapshot.Snapshot is ExampleState:
            _state = (ExampleState)snapshot.Snapshot;
                break;
        }
    }

    protected override void OnCommand(object message)
    {
        switch (message)
        {
            case Cmd cmd:
                Persist(new Evt($"{cmd.Data}-{NumEvents}"), UpdateState);
                Persist(new Evt($"{cmd.Data}-{NumEvents + 1}"), evt =>   //이코드는 복제와 추가행동 전략과 관련있습니다.
                {
                    UpdateState(evt);
                    Context.System.EventStream.Publish(evt);
                });
                break;
            case "snap":
                SaveSnapshot(_state);
                break;
            case "print":
                Console.WriteLine("Try print");
                Console.WriteLine(_state);
                break;
            case Shutdown down:
                Context.Stop(Self);
                break;
        }
    }

    public override string PersistenceId { get; } = "sample-id-1";  //영속성을 위한 고유한 아이디값을 가집니다.
}


#endregion


Test Option


akka.persistence.max-concurrent-recoveries = 50 #복구 최고 개수
akka.actor.default-mailbox.stash-capacity = 10000
akka.persistence.internal-stash-overflow-strategy = "akka.persistence.ThrowExceptionConfigurator"

persitence와 관련하여, 위와같은 옵션을 제공합니다.


TestCode

var actorInfo = Props.Create<PersistentActor>();
var perActor = actorSystem.ActorOf(actorInfo, "myActor");                        
// usage
Cmd cmd1 = new Cmd("장바구니를 물건을 담음-1");
Cmd cmd2 = new Cmd("장바구니에 물건을 담음-2");
Cmd cmd3 = new Cmd("장바구니에 물건을 또담음-3");
Cmd cmd4 = new Cmd("장바구니에 처음 담은 물건을뺌-4");
perActor.Tell(cmd1);
perActor.Tell(cmd2);
perActor.Tell(cmd3);
perActor.Tell(cmd4);
perActor.Tell("print"); //현재까지 액터가 가진 이벤트리스트를 재생합니다.            
waitForTest(1000);

//액터를 강제로 죽입니다.
perActor.Tell(Akka.Actor.Kill.Instance, ActorRefs.NoSender);
waitForTest(1000);

//액터가 복구되는지 확인합니다.
perActor = actorSystem.ActorOf(actorInfo, "myActor");            
perActor.Tell("print");

[DEBUG][2017-09-24 오후 1:59:13][Thread 0009][EventStream(ServiceA)] Logger log1-DefaultLogger [DefaultLogger] started
[DEBUG][2017-09-24 오후 1:59:13][Thread 0009][EventStream(ServiceA)] StandardOutLogger being removed
[DEBUG][2017-09-24 오후 1:59:13][Thread 0009][EventStream(ServiceA)] Default Loggers started
[INFO][2017-09-24 오후 1:59:13][Thread 0009][remoting] Starting remoting
[DEBUG][2017-09-24 오후 1:59:13][Thread 0015][remoting] Starting prune timer for endpoint manager...
[INFO][2017-09-24 오후 1:59:13][Thread 0009][remoting] Remoting started; listening on addresses : [akka.tcp://ServiceA@127.0.0.1:8001]
[INFO][2017-09-24 오후 1:59:13][Thread 0009][remoting] Remoting now listens on addresses: [akka.tcp://ServiceA@127.0.0.1:8001]
[WARNING][2017-09-24 오후 1:59:13][Thread 0009][ActorSystem(ServiceA)] NewtonSoftJsonSerializer has been detected as a default serializer. It will be obsoleted in Akka.NET starting from version 1.5 in the favor of Hyperion (for more info visit: http://getakka.net/docs/Serialization#how-to-setup-hyperion-as-default-serializer ). If you want to suppress this message set HOCON `akka.suppress-json-serializer-warning` config flag to on.
WaitforTest:1000
Try print
장바구니에 처음 담은 물건을뺌-4-3, 장바구니에 물건을 또담음-3-2, 장바구니에 물건을 담음-2-1, 장바구니를 물건을 담음-1-0
WaitforTest:1000
[ERROR][2017-09-24 오후 1:59:14][Thread 0010][akka://ServiceA/user/myActor] Kill
Cause: Akka.Actor.ActorKilledException: Kill
위치: Akka.Actor.ActorCell.AutoReceiveMessage(Envelope envelope)
위치: Akka.Actor.ActorCell.Invoke(Envelope envelope)
Try print - 여기서 액터의 이벤트가 복구됨을 확인할수 있습니다.
장바구니에 처음 담은 물건을뺌-4-3, 장바구니에 물건을 또담음-3-2, 장바구니에 물건을 담음-2-1, 장바구니를 물건을 담음-1-0


 위 설계 코드를 테스트하는 코드는 간단합니다. 우리는 어떠한 커멘드를 비 동기로 보내고, 

모든 커멘드가 print에의해 , 재생할수 있는지 여부를 보는것입니다.

이 전략은 분명 전통적인 개발 방법에서 마지막 상태값을 DB에 저장하는것보다 더 유연한 기능을 제공합니다.

마지막값만 재생할지? 마지막값중 10개까지만 재생할지? 전체를 재생할지?

그것은  max-concurrent-recoveries 옵션을통해

제한 가능합니다. 이것은 우리가 설계한 액터가 죽지않는다는 가정을 하면

이것만으로도 , 응답성이 빠른 기능을 설계할수 있을것입니다.


AKKA는 이러한 액터가 죽을수 있는 상황도 가정을합니다.

AKKA에서 사용하는 모든 액터와 관련한 기능은 그 자체로 단순화된 사용을 제시하지만

반대편에는 복구와 관련된 전략이 몇가지씩 따라옵니다. 그래서 이부분에 대해서는

어렵다라고 설명드리고 싶습니다. 우리는 우리가 설계하는 코드에대해 복구하는 전략을

생각해본적이 없기때문에 그 관점에서는 AKKA는 어렵습니다.


다음장에서는 복구전략과 연동하여   업데이트 혹은 장애발생시 위 샘플에서 한 이벤트 재생값 기능을

특정한 저널을 사용하여 액터의 영속성이 보장되는지 살펴보도록 하겠습니다. 



  • No labels