Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

이벤트 소싱(및 샤딩) 은 커다란 웹사이트를 수십억명의 사용자 규모로 확장하는 아이디어이며

이아이디어는 매우 간답합니다. 메시지가 발생할때 명령의 이벤트 목록을 생성하고 저장을 합니다.

이 계획은 이벤트만이 저장소에 추가되고 아무것도 변이되지 않는 점이며 

이를 통해 이벤트 스트림의 소비자를 완벽하게 복제하고 확장할수 있습니다. 

이벤트 소싱을 실현하기 위해서, AKKA에서 PersistenceActor 가 사용이 됩니다.


 장바구니를 예를 들어봅시다. 사용자는 물건 구매전에 이것저것 상품을 장바구니에 담습니다.

그리고 잠깐 다른 볼일을 보다가, 사용자는 컴퓨터를 다시 켰습니다. 사용자는 자신이 담은 그 순서대로

장바구니를 볼수가 있으며,  이순간 서버업데이트가 발생하여 서비스 재시작이 되었습니다.

사용자는 그래도 빈화면이 아닌 자신의 장바구니를 그대로 볼수가 있으며

구매 결정시 장바구니를 비울수가 있습니다. 이것이 이벤트 소싱의 가장 간단한 예입니다.

모든 이벤트를 기록을 하고, 이 이벤트는 언제든 분산처리되어 복제될수 있으며

복제가 될수있다란 의미는, 대용량 처리를위해 언제든 분산 처리 전략을 세울수 있다란 것입니다. 

이벤트사용이 끝나고 더이상 필요가 없을시 또한 즉각 비울수가 있습니다.

트위터 같은 서비스가 이러한 전략으로 수십억명에게 메시지를 전달할수가 있습니다.

액터설계-모든 커멘드를 기록하는 액터

액터 설계 Point

  • 메시지 설계는 명령(cmd)와 발생하는 이벤트(evt) 를 분리하여 설계합니다.
  • UntypedPersistentActor 를 상속받아서 자신만의 PersitentActor를 설계합니다.

  • OnCommand 에서 메시지 처리기를 설계합니다.

    • 명령과 관련있는 이벤트를 저장합니다.  Persist(new Evt($"{cmd.Data}-{NumEvents}"), UpdateState)

  • override Recovery Recovery 를 통해 복구기를 지정합니다.
  • protected override void OnRecover 를 통해 복구가 일어날때의 코드를 재정의합니다.
  • EventList 는 ImmutableList를 사용하여, 업데이트 될시 Event를 추가만 하면됩니다.


Code Block
languagec#
themeEmacs
linenumberstrue
using System.Collections.Immutable;
using Akka.Persistence;

#region MessageData
public class Shutdown { }

//커멘드와 이벤트를 분리합니다. 커멘드는 이벤트를 발생시키는 명령이며
//1 커멘드는 n개의 이벤트로 복제가 될수 있습니다.
public class Cmd
{
    public Cmd(string data)
    {
        Data = data;
    }

    public string Data { get; }
}

public class Evt
{
    public Evt(string data)
    {
        Data = data;
    }

    public string Data { get; }
}

public class ExampleState
{
    private readonly ImmutableList<string> _events;

    public ExampleState(ImmutableList<string> events)
    {
        _events = events;
    }

    public ExampleState() : this(ImmutableList.Create<string>())
    {
    }

    public ExampleState Updated(Evt evt)
    {
        return new ExampleState(_events.Add(evt.Data));
    }

    public int Size => _events.Count;

    public override string ToString()
    {
        return string.Join(", ", _events.Reverse());
    }
}

#endregion

#region Actor
public class PersistentActor : UntypedPersistentActor
{
    private ExampleState _state = new ExampleState();

    private void UpdateState(Evt evt)
    {
        _state = _state.Updated(evt);
    }

    private int NumEvents => _state.Size;

    public override Recovery Recovery => new Recovery(fromSnapshot: SnapshotSelectionCriteria.None);
    protected override void OnRecover(object message)
    {
        switch (message)
        {
            case Evt evt:
                UpdateState(evt);
                break;
            case SnapshotOffer snapshot when snapshot.Snapshot is ExampleState:
            _state = (ExampleState)snapshot.Snapshot;
                break;
        }
    }

    protected override void OnCommand(object message)
    {
        switch (message)
        {
            case Cmd cmd:
                Persist(new Evt($"{cmd.Data}-{NumEvents}"), UpdateState);
                Persist(new Evt($"{cmd.Data}-{NumEvents + 1}"), evt =>   //이코드는 복제와 추가행동 전략과 관련있습니다.
                {
                    UpdateState(evt);
                    Context.System.EventStream.Publish(evt);
                });
                break;
            case "snap":
                SaveSnapshot(_state);
                break;
            case "print":
                Console.WriteLine("Try print");
                Console.WriteLine(_state);
                break;
            case Shutdown down:
                Context.Stop(Self);
                break;
        }
    }

    public override string PersistenceId { get; } = "sample-id-1";  //영속성을 위한 고유한 아이디값을 가집니다.
}


#endregion


Test Option

Panel


akka.persistence.max-concurrent-recoveries = 50 #복구 최고 개수
akka.actor.default-mailbox.stash-capacity = 10000
akka.persistence.internal-stash-overflow-strategy = "akka.persistence.ThrowExceptionConfigurator"

persitence와 관련하여, 위와같은 옵션을 제공합니다.


TestCode


 위 설계 코드를 테스트하는 코드는 간단합니다. 우리는 어떠한 커멘드를 비 동기로 보내고, 

모든 커멘드가 print에의해 , 재생할수 있는지 여부를 보는것입니다.

이 전략은 분명 전통적인 개발 방법에서 마지막 상태값을 DB에 저장하는것보다 더 유연한 기능을 제공합니다.

마지막값만 재생할지? 마지막값중 10개까지만 재생할지? 전체를 재생할지?

그것은  max-concurrent-recoveries 옵션을통해

제한 가능합니다. 이것은 우리가 설계한 액터가 죽지않는다는 가정을 하면

이것만으로도 , 응답성이 빠른 기능을 설계할수 있을것이며, 실제로 장애로인해

이벤트 소스가 리스타트가 되더라도 모든 이벤트가 복구가 될것입니다.


Code Block
languagec#
themeEmacs
linenumberstrue
var actorInfo = Props.Create<PersistentActor>();
var perActor = actorSystem.ActorOf(actorInfo, "myActor");                        
// usage
Cmd cmd1 = new Cmd("장바구니를 물건을 담음-1");
Cmd cmd2 = new Cmd("장바구니에 물건을 담음-2");
Cmd cmd3 = new Cmd("장바구니에 물건을 또담음-3");
Cmd cmd4 = new Cmd("장바구니에 처음 담은 물건을뺌-4");
perActor.Tell(cmd1);
perActor.Tell(cmd2);
perActor.Tell(cmd3);
perActor.Tell(cmd4);
perActor.Tell("print"); //현재까지 액터가 가진 이벤트리스트를 재생합니다.            
waitForTest(1000);

//액터를 강제로 죽입니다.
perActor.Tell(Akka.Actor.Kill.Instance, ActorRefs.NoSender);
waitForTest(1000);

//액터가 복구되는지 확인합니다.
perActor = actorSystem.ActorOf(actorInfo, "myActor");            
perActor.Tell("print");

protected void waitForTest(int time = 1000)    //비동기 테스트를 위해 기다림
{
    Console.WriteLine("WaitforTest:" + time);
    Task.Delay(time).Wait();
}
Expand
titleresult

[DEBUG][2017-09-24 오후 1:59:13][Thread 0009][EventStream(ServiceA)] Logger log1-DefaultLogger [DefaultLogger] started
[DEBUG][2017-09-24 오후 1:59:13][Thread 0009][EventStream(ServiceA)] StandardOutLogger being removed
[DEBUG][2017-09-24 오후 1:59:13][Thread 0009][EventStream(ServiceA)] Default Loggers started
[INFO][2017-09-24 오후 1:59:13][Thread 0009][remoting] Starting remoting
[DEBUG][2017-09-24 오후 1:59:13][Thread 0015][remoting] Starting prune timer for endpoint manager...
[INFO][2017-09-24 오후 1:59:13][Thread 0009][remoting] Remoting started; listening on addresses : [akka.tcp://ServiceA@127.0.0.1:8001]
[INFO][2017-09-24 오후 1:59:13][Thread 0009][remoting] Remoting now listens on addresses: [akka.tcp://ServiceA@127.0.0.1:8001]
[WARNING][2017-09-24 오후 1:59:13][Thread 0009][ActorSystem(ServiceA)] NewtonSoftJsonSerializer has been detected as a default serializer. It will be obsoleted in Akka.NET starting from version 1.5 in the favor of Hyperion (for more info visit: http://getakka.net/docs/Serialization#how-to-setup-hyperion-as-default-serializer ). If you want to suppress this message set HOCON `akka.suppress-json-serializer-warning` config flag to on.
WaitforTest:1000
Try print
장바구니에 처음 담은 물건을뺌-4-3, 장바구니에 물건을 또담음-3-2, 장바구니에 물건을 담음-2-1, 장바구니를 물건을 담음-1-0
WaitforTest:1000
[ERROR][2017-09-24 오후 1:59:14][Thread 0010][akka://ServiceA/user/myActor] Kill
Cause: Akka.Actor.ActorKilledException: Kill
위치: Akka.Actor.ActorCell.AutoReceiveMessage(Envelope envelope)
위치: Akka.Actor.ActorCell.Invoke(Envelope envelope)
Try print - 여기서 액터의 이벤트가 복구됨을 확인할수 있습니다.
장바구니에 처음 담은 물건을뺌-4-3, 장바구니에 물건을 또담음-3-2, 장바구니에 물건을 담음-2-1, 장바구니를 물건을 담음-1-0



Info

AKKA는 이러한 액터가 죽을수 있는 상황도 가정을합니다.

AKKA에서 사용하는 모든 액터와 관련한 기능은 그 자체로 단순화된 사용을 제시하지만

반대편에는 복구와 관련된 전략이 몇가지씩 따라옵니다. 그래서 이부분에 대해서는

어렵다라고 설명드리고 싶습니다. 우리는 우리가 설계하는 코드에대해 복구하는 전략을

생각해본적이 없기때문에 그 관점에서는 AKKA는 어렵습니다.


다음장에서는 복구전략과 연동하여   업데이트 혹은 장애발생시 위 샘플에서 한 이벤트 재생값 기능을

특정한 저널을 사용하여 액터의 영속성이 보장되는지 살펴보도록 하겠습니다. 

...