Page History
...
Code Block | ||||
---|---|---|---|---|
| ||||
// 빗물을 처리하는 액터(지정된 초마다 작동) var throttleActor = Sys.ActorOf(Props.Create(() => new ThrottleActor(timeSec))); // 물을 생산하는 액터 var throttleWork = Sys.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec))); // 생산자와 소비자를 연결합니다 // DI가 사용이되지 않았으며,참조객체 디펀던시가 없습니다. 이것은 네트워크로 작업자가 분리될수 있음을 의미합니다. throttleActor.Tell(new SetTarget(throttleWork)); throttleWork.Tell(new SetTarget(probe)); // 빗물을, 빗물 처리자에게 불규칙적으로 계속 보낼수 있습니다. throttleActor.Tell(new Queue("빗물")); //생산자는 그냥 물을 받게됩니다. probe.ExpectMsg<DelayMsg>(); |
액터구현
ThrottleActor 는 이 장의 설명을 위해, 작성하였으나 데이터 처리 대상의 제약을 두지 않았기때문에 다양하게 활용가능합니다.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
using System; using System.Collections.Immutable; using Akka; using Akka.Actor; using Akka.Event; using Akka.Monitoring; using Akka.Streams; using Akka.Streams.Dsl; using AkkaNetCore.Models.Message; namespace AkkaNetCore.Actors.Utils { // 일괄 처리(데이터 인입) public class ThrottleWork : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); private IActorRef consumer; public ThrottleWork(int element,int maxBust) { ReceiveAsync<SetTarget>(async target => { consumer = target.Ref; }); ReceiveAsync<object>(async message => { if (message is Batch batchMessage) { int Count = batchMessage.Obj.Count; Context.IncrementMessagesReceived(); Source<object, NotUsed> source = Source.From(batchMessage.Obj); using (var materializer = Context.Materializer()) { var factorials = source; factorials //.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}") .Throttle(element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping) .RunForeach(obj => { var nowstr = DateTime.Now.ToString("mm:ss"); if(obj is DelayMsg delayMsg) { Console.WriteLine($"[{nowstr}] - {delayMsg.Message}"); if (consumer != null) consumer.Tell(delayMsg); } }, materializer) .Wait(); } } }); } } public class ThrottleActor : FSM<State, IData> { private readonly ILoggingAdapter logger = Context.GetLogger(); private int CollectSec; public ThrottleActor(int _CollectSec) { CollectSec = _CollectSec; StartWith(State.Idle, Uninitialized.Instance); When(State.Idle, state => { if (state.FsmEvent is SetTarget target && state.StateData is Uninitialized) { return Stay().Using(new Todo(target.Ref, ImmutableList<object>.Empty)); } return null; }); When(State.Active, state => { if ((state.FsmEvent is Flush || state.FsmEvent is StateTimeout) && state.StateData is Todo t) { return GoTo(State.Idle).Using(t.Copy(ImmutableList<object>.Empty)); } return null; }, TimeSpan.FromSeconds(CollectSec)); WhenUnhandled(state => { if (state.FsmEvent is Queue q && state.StateData is Todo t) { return GoTo(State.Active).Using(t.Copy(t.Queue.Add(q.Obj))); } else { logger.Warning($"Received unhandled request {state.FsmEvent} in state {StateName}/{state.StateData}"); return Stay(); } }); OnTransition((initialState, nextState) => { if (initialState == State.Active && nextState == State.Idle) { if (StateData is Todo todo) { todo.Target.Tell(new Batch(todo.Queue)); } else { // nothing to do } } }); Initialize(); } } } |
Code Block | ||||
---|---|---|---|---|
| ||||
// AkkaStream을 Actor내에 사용하기 위해 준비되는코드입니다.var materializer = Context.Materializer(); // FSM이 사용되었으며, 특정초마다 수집한 데이터를 처리합니다. (스트림 출발점) When(State.Active, state => { ....... }, TimeSpan.FromSeconds(CollectSec)); // 빗물담당자는 빗물을 한꺼번에 흘려보냈지만 // 작업자는 안정적으로 초당 5개씩5개 처리를 하며 //처리 제약을두며 흘러들어온 버퍼를 꾸준하게 작업한만큼 비웁니다. // 처리대상 소스 :한꺼번에 몰려욤 Source<object, NotUsed> source = Source.From(batchMessage.Obj); var factorials = source; factorials // 조절기를 달아서 천천히 처리,가속 기울기를 선택할수 있는것은 보너스입니다. source.Throttle(element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping) .RunForeach(obj => { var nowstr = DateTime.Now.ToString("mm:ss"); if(obj is DelayMsg delayMsg) { Console.WriteLine($"[{nowstr}] - {delayMsg.Message}"); if (consumer != null) consumer.Tell(delayMsg); } }, materializer) .Wait(); |
참고링크: