FSM을 간단하게 정리하면 아래와 같습니다.
- 유한 상태 기계는 자신이 취할 수 있는 유한한 갯수의 상태들을 가진다.
- 그리고 그 중에서 반드시 하나의 상태만 취한다.
- 현재 상태는 특정 조건이 되면 다른 상태로 변할 수 있다.
- 유한 상태 기계는 가능한 상태들의 집합과 각 상태들의 전이 조건으로 정의 될 수 있다.
- 상태들의 노드와 그 노드들을 연결하는 조건의 엣지로 표현할 수 있다(그래프)
이것은 메시지 처리기에도 적용가능한부분이며, 특정 상태에 따라 메시지를 다르게 처리하는
AKKA에서 제공하는 FSM액터를 재정의 받아서 생성가능합니다.
상태 머신설계
해당 액터의 메시지 처리기를 다음과 같이 설계한다고 가정해봅시다.
- 메시지를 일정시간 모아뒀다가 내가 원하는 타이밍에 대상에 보내고 싶다.
- 메시지는 쌓이고 보내는 명령을 하지 않았다면, 특정시간 이내에 쌓인만큼 자동으로 보내고 싶다.
이것은 일반적으로, 리스트형태로 변환가능한 동일한 Type의 데이터일시 잦은 Update주기를
제한하고자 할때 많이 사용됩니다. 1초이내에 100번 업데이터되는것보다, 1초동안 모은 100개의 데이터를
뭉쳐서 주는 방법이 훨씬 효율적이기 때문입니다.
상태설계는 아래와 같이 단순합니다.
- 상태값은 대기와 활동상태 두개를 가진다.
- 특정 시간이 지나면 활동상태로 깨어난다.
- 대기이던 활동이던, 데이터 수신은 가능하다.
샘플 데이터 정의
액터설계
public class ExampleFSMActor : FSM<State, IData> { private readonly ILoggingAdapter log = Context.GetLogger(); public ExampleFSMActor() { StartWith(State.Idle, Uninitialized.Instance); When(State.Idle, state => { if( state.FsmEvent is SetTarget && state.StateData is Uninitialized) { SetTarget target = state.FsmEvent as SetTarget; return Stay().Using(new Todo(target.Ref, ImmutableList<object>.Empty)); } return null; }); When(State.Active, state => { if ( (state.FsmEvent is Flush || state.FsmEvent is StateTimeout) && state.StateData is Todo ) { Todo t = state.StateData as Todo; return GoTo(State.Idle).Using(t.Copy(ImmutableList<object>.Empty)); } return null; }, TimeSpan.FromSeconds(1)); WhenUnhandled(state => { if (state.FsmEvent is Queue && state.StateData is Todo) { Todo t = state.StateData as Todo; Queue q = state.FsmEvent as Queue; return GoTo(State.Active).Using(t.Copy(t.Queue.Add(q.Obj))); } else { log.Warning("Received unhandled request {0} in state {1}/{2}", state.FsmEvent, StateName, state.StateData); return Stay(); } }); OnTransition((initialState, nextState) => { if (initialState == State.Active && nextState == State.Idle) { if (StateData is Todo) { Todo todo = StateData as Todo; todo.Target.Tell(new Batch(todo.Queue)); } else { // nothing to do } } }); Initialize(); } }//end class public class FSMTestActor : UntypedActor { private ILoggingAdapter log = Context.GetLogger(); //기본탑재 로그 protected override void OnReceive(object message) { //log.Info("GetMessage:" + message.ToString()); if(message is Batch) { Batch batchMessage = message as Batch; log.Info( "GetMessageCnt:" + batchMessage.Obj.Count.ToString() ); } } }
테스트 수행
Flush란 : 메시지큐를 비운다란 의미보다, 비동기적으로 적재된 메시지를 처리를 하겠다란 의미입니다.
테스트 의도:
- 메시지를 하나만 보내고, 수동 처리(Flush) 를 한다.
- 메시지를 두개 보내고 , 수동 처리(Flush)한다.
- 메시지를 4개보내고 한꺼번에 자동으로 처리되는지 확인한다.
IActorRef target = actorSystem.ActorOf<FSMTestActor>("fsmtestActor"); IActorRef fsmActor = actorSystem.ActorOf<ExampleFSMActor>("fsmActor"); fsmActor.Tell(new SetTarget(target)); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Flush()); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Queue(43)); fsmActor.Tell(new Flush()); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Queue(43)); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Flush()); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Queue(43)); fsmActor.Tell(new Queue(42)); fsmActor.Tell(new Queue(43));
public class FSMTestActor : UntypedActor { private ILoggingAdapter log = Context.GetLogger(); //기본탑재 로그 protected override void OnReceive(object message) { //log.Info("GetMessage:" + message.ToString()); if(message is Batch) { Batch batchMessage = message as Batch; log.Info( "GetMessageCnt:" + batchMessage.Obj.Count.ToString() ); } } }
배치처리를 위한 액터는 단지 리스트를 받으면 한꺼번에 처리를 진행하면 되며
순수한 배치처리를 위한 도메인 로직만을 신경 쓰면되겠습니다.
우리가 의도했던대로... 1 ,2 3 개씩 처리하고나서 마지막 4개는 자동으로 처리가 됩니다.
여기서 Count를 알수 있다란 의미는, 불특정하게 발생한 이벤트를 우리가 의도한 룰대로 모아서
리스트 형태의 데이터로 일괄 처리할수 있습니다.
위 기능이 사용되면 좋은곳
- 불규칙적으로 발생하는 대용량 인입 저장의 성능 향상
- 대용량 인입처리의 분산처리화
- 기타등등