Page History
Info |
---|
FSM을 간단하게 정리하면 아래와 같습니다.
|
이것은 메시지 처리기에도 적용가능한부분이며, 특정 상태에 따라 메시지를 다르게 처리하는
AKKA에서 제공하는 FSM액터를 재정의 받아서 생성가능합니다이러한 상태머신을 메시지 처리기에 설계하여 적용가능하다란것이 액터가 가진 장점중에 하나입니다.
상태 머신설계
...
해당 액터의 메시지 처리기를 다음과 같이 설계한다고 가정해봅시다.
- 메시지를 일정시간 모아뒀다가 내가 원하는 타이밍에 대상에 보내고 모아서 처리하고 싶다.
- 메시지는 쌓이고 보내는 명령을 하지 않았다면, 특정시간 이내에 쌓인만큼 자동으로 보내고 처리하고 싶다.
이것은 일반적으로, 리스트형태로 변환가능한 동일한 Type의 데이터일시 잦은 Update주기를 제한하고자 할때 많이 사용됩니다.
1초이내에 100번 업데이터되는것보다, 1초동안 모은 100개의 데이터를 뭉쳐서 주는 방법이 처리하는 방식이 훨씬 효율적이기 때문입니다.
상태설계는 아래와 같이 단순합니다.
- 상태값은 대기와 활동상태 두개를 가진다.
- 특정 시간이 지나면 활동상태로 깨어난다깨어나며, 대기큐에서 데이터 처리를 할수 있다.
- 대기이던 활동이던, 데이터 수신은 가능하다.
...
- 수신 가능하여 대기 큐에 적재가된다.
FSM사용을 위한 기본 클래스 설계
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
// received events public class SetTarget { public SetTarget(IActorRef @ref) { Ref = @ref; } public IActorRef Ref { get; } } public class Queue { public Queue(object obj) { Obj = obj; } public Object Obj { get; } } public class Flush { } // send events public class Batch { public Batch(ImmutableList<object> obj) { Obj = obj; } public ImmutableList<object> Obj { get; } } public enum State { Idle, Active } // data public interface IData { } public class Uninitialized : IData { public static Uninitialized Instance = new Uninitialized(); private Uninitialized() { } } public class Todo : IData { public Todo(IActorRef target, ImmutableList<object> queue) { Target = target; Queue = queue; } public IActorRef Target { get; } public ImmutableList<object> Queue { get; } public Todo Copy(ImmutableList<object> queue) { return new Todo(Target, queue); } } |
FSM 액터설계
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class ExampleFSMActor : FSM<State, IData> { private readonly ILoggingAdapter log = Context.GetLogger(); public ExampleFSMActor() { //아래코드는 상태,시작상태 전이상태 정의 가능하며 //StateChart 다이어그램으로 표현이 가능합니다. StartWith(State.Idle, Uninitialized.Instance); When(State.Idle, state => { if( state.FsmEvent is SetTarget && state.StateData is Uninitialized) { SetTarget target = state.FsmEvent as SetTarget; return Stay().Using(new Todo(target.Ref, ImmutableList<object>.Empty)); } return null; }); When(State.Active, state => { if ( (state.FsmEvent is Flush || state.FsmEvent is StateTimeout) && state.StateData is Todo ) { Todo t = state.StateData as Todo; return GoTo(State.Idle).Using(t.Copy(ImmutableList<object>.Empty)); } return null; }, TimeSpan.FromSeconds(1)); //1초동안 받은 데이터를 처리한다. WhenUnhandled(state => { if (state.FsmEvent is Queue && state.StateData is Todo) { Todo t = state.StateData as Todo; Queue q = state.FsmEvent as Queue; return GoTo(State.Active).Using(t.Copy(t.Queue.Add(q.Obj))); } else { log.Warning("Received unhandled request {0} in state {1}/{2}", state.FsmEvent, StateName, state.StateData); return Stay(); } }); OnTransition((initialState, nextState) => { if (initialState == State.Active && nextState == State.Idle) { if (StateData is Todo) { Todo todo = StateData as Todo; todo.Target.Tell(new Batch(todo.Queue)); } else { // nothing to do } } }); Initialize(); } }//end class // 실제 배치를 처리하는 액터 public class FSMTestActor : UntypedActor { private ILoggingAdapter log = Context.GetLogger(); //기본탑재 로그 protected override void OnReceive(object message) { //log.Info("GetMessage:" + message.ToString()); if(message is Batch) { Batch batchMessage = message as Batch; log.Info( "GetMessageCnt:" + batchMessage.Obj.Count.ToString() ); } } } |
테스트 수행
Flush란 : 메시지큐를 비운다란 의미보다, 비동기적으로 적재된 메시지를 처리를 하겠다란 의미입니다.
테스트 의도:
- 메시지를 하나만 보내고, 수동 처리(Flush) 를 한다.
- 메시지를 두개 보내고 , 수동 처리(Flush)한다.
- 메시지를 4개보내고 한꺼번에 자동으로 처리되는지 확인한다.
...
Code Block | ||||
---|---|---|---|---|
| ||||
Code Block | ||||
| ||||
IActorRef target = actorSystem.ActorOf<FSMTestActor>("fsmtestActor"); IActorRef fsmActor = actorSystem.ActorOf<ExampleFSMActor>("fsmActor"); fsmActor.Tell(new SetTarget(target)); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Flush()); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Queue(43)); fsmActor.Tell(new Flush()); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Queue(43)); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Flush()); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Queue(43)); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Queue(43)); |
우리가 의도했던대로... 1 ,2 3 개씩 처리하고나서 마지막 4개는 자동으로 처리가 됩니다.
Code Block | ||
---|---|---|
| ||
public class FSMTestActor : UntypedActor
{
private ILoggingAdapter log = Context.GetLogger(); //기본탑재 로그
protected override void OnReceive(object message)
{
//log.Info("GetMessage:" + message.ToString());
if(message is Batch)
{
Batch batchMessage = message as Batch;
log.Info( "GetMessageCnt:" + batchMessage.Obj.Count.ToString() );
}
}
} |
배치처리를 위한 액터는 단지 리스트를 받으면 한꺼번에 처리를 진행하면 되며
순수한 배치처리를 위한 도메인 로직만을 신경 쓰면되겠습니다.
우리가 의도했던대로... 1 ,2 3 개씩 처리하고나서 마지막 4개는 자동으로 처리가 됩니다.
여기서 Count를 알수 있다란 의미는, 불특정하게 발생한 이벤트를 우리가 의도한 룰대로 모아서
리스트 형태의 데이터로 일괄 처리할수 있습니다.
Note |
---|
위 기능이 사용되면 좋은곳
|
원문 참고 링크 : https://getakka.net/articles/actors/finite-state-machine.html