Page History
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
using System; using Akka.Actor; using Akka.TestKit; using Akka.TestKit.NUnit3; using AkkaNetCore.Actors.Utils; using AkkaNetCore.Models.Message; using NUnit.Framework; namespace AkkaNetCoreTest.Actors { class ThrottleActorTest : TestKit { protected TestProbe probe; [SetUp] public void Setup() { //스트림을 제공받는 최종 소비자 ( 물을 제공 받는 고객 ) probe = this.CreateTestProbe(); } [TestCase(15)] public void ThrottleActorAreOK(int cutoffSec) { // 초당 5개 처리한정 ( 더 처리하고 싶으면 이값을 늘린다.) int timeSec = 1; int elemntPerSec = 5; var throttleActor = Sys.ActorOf(Props.Create(() => new ThrottleActor(timeSec))); var throttleWork = Sys.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec))); // 밸브에게 작업자 지정 ( 밸브는 초당 스트림을 모아서 방출한다 ) // 작업자는 방류된 스트림을 기본적으로 쌓아두고, 초당 지정된 개수만 처리한다. throttleActor.Tell(new SetTarget(throttleWork)); // 소비자지정 : 소비자는 몇개가 초당 처리되던 상관없이, 완료된 작업만 제공받는다. throttleWork.Tell(new SetTarget(probe)); Within(TimeSpan.FromSeconds(cutoffSec), () => { // 50개 처리완료는 10초이내에 끝나야함... for(int i=0; i<50; i++) { string seq = (i + 1).ToString(); throttleActor.Tell(new Queue(new DelayMsg() { Delay = 0, Seq = seq, Message = $"초당:{elemntPerSec} 테스트-{seq}", State = DelayMsgState.Reserved })); } DelayMsg lastMessage = null; //물이라고 가정하자..(빗물->정제->포장 과정 생략) for (int i = 0; i < 50; i++) { lastMessage =probe.ExpectMsg<DelayMsg>(); } //마지막 메시지의 Seq는 50이여야함 Assert.AreEqual("50", lastMessage.Seq); }); } /* 위 테스트의 결과로그,블락킹없이 실시간으로 초당 5개씩 처리함 [49:09] - 초당:5 테스트-1 [49:09] - 초당:5 테스트-2 [49:09] - 초당:5 테스트-3 [49:09] - 초당:5 테스트-4 [49:10] - 초당:5 테스트-5 [49:10] - 초당:5 테스트-6 [49:10] - 초당:5 테스트-7 [49:10] - 초당:5 테스트-8 [49:10] - 초당:5 테스트-9 [49:11] - 초당:5 테스트-10 */ } } |
액터구현
Code Block | |||||||
---|---|---|---|---|---|---|---|
| |||||||
// 빗물을 처리하는 액터(지정된 초마다 작동)
var throttleActor = Sys.ActorOf(Props.Create(() => new ThrottleActor(timeSec)));
// 물을 생산하는 액터
var throttleWork = Sys.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)));
// 생산자와 소비자를 연결합니다
// DI가 사용이되지 않았으며,참조객체 디펀던시가 없습니다. 이것은 네트워크로 작업자가 분리될수 있음을 의미합니다.
throttleActor.Tell(new SetTarget(throttleWork));
throttleWork.Tell(new SetTarget(probe));
// 빗물을, 빗물 처리자에게 불규칙적으로 계속 보낼수 있습니다.
throttleActor.Tell(new Queue("빗물"));
//생산자는 그냥 물을 받게됩니다.
probe.ExpectMsg<DelayMsg>(); |
액터구현
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
using System; using System.Collections.Immutable; using Akka; using Akka.Actor; using Akka.Event; using Akka.Monitoring; using Akka.Streams; using Akka.Streams.Dsl; using AkkaNetCore.Models.Message; namespace AkkaNetCore.Actors.Utils { // 일괄 처리(데이터 인입) public class ThrottleWork : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); private IActorRef consumer; public ThrottleWork(int element,int maxBust) { ReceiveAsync<SetTarget>(async target => { consumer = target.Ref; }); ReceiveAsync<object>(async message => { using System; using System.Collections.Immutable; using Akka; using Akka.Actor; using Akka.Event; using Akka.Monitoring; using Akka.Streams; using Akka.Streams.Dsl; using AkkaNetCore.Models.Message; namespace AkkaNetCore.Actors.Utils { // 일괄 처리(데이터 인입) public class ThrottleWork : ReceiveActor { private readonly ILoggingAdapter logger =if Context.GetLogger(); (message is Batch batchMessage) private IActorRef consumer; public{ ThrottleWork(int element,int maxBust) { int Count = batchMessage.Obj.Count; ReceiveAsync<SetTarget>(async target => Context.IncrementMessagesReceived(); { Source<object, NotUsed> consumersource = target.RefSource.From(batchMessage.Obj); }); using ReceiveAsync<object>(asyncvar messagematerializer => Context.Materializer()) { if (message is Batch batchMessage) var factorials = source; { factorials int Count = batchMessage.Obj.Count; Context.IncrementMessagesReceived(); //.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}") Source<object, NotUsed> source = Source.From(batchMessage.Obj); using .Throttle(var materializer = Context.Materializer())element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping) { .RunForeach(obj => { var factorials = source; factorials var nowstr = //.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}")DateTime.Now.ToString("mm:ss"); .Throttle(element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping) if(obj is DelayMsg delayMsg) .RunForeach(obj => { var nowstr = DateTimeConsole.Now.ToStringWriteLine("mm:ss$"[{nowstr}] - {delayMsg.Message}"); if (objconsumer is!= DelayMsg null) consumer.Tell(delayMsg); {} Console.WriteLine($"[{nowstr}] - {delayMsg.Message}"); }, materializer) if (consumer != null) consumer.TellWait(delayMsg); } } }); } } public class ThrottleActor : FSM<State, IData> { private readonly ILoggingAdapter logger = Context.GetLogger(); private int CollectSec; public ThrottleActor(int _CollectSec) }, materializer){ CollectSec = _CollectSec; .Wait(StartWith(State.Idle, Uninitialized.Instance); When(State.Idle, state => } { } }); } } if (state.FsmEvent is SetTarget target && state.StateData is Uninitialized) public class ThrottleActor : FSM<State, IData> { private readonly ILoggingAdapter logger = Context.GetLogger(); private int CollectSec; return Stay().Using(new Todo(target.Ref, ImmutableList<object>.Empty)); public ThrottleActor(int _CollectSec) {} CollectSec =return _CollectSecnull; StartWith(State.Idle, Uninitialized.Instance}); When(State.IdleActive, state => { if ((state.FsmEvent is Flush || state.FsmEvent is SetTarget target StateTimeout) && state.StateData is isTodo Uninitializedt) { return StayGoTo(State.Idle).Using(new Todo(target.Ref, t.Copy(ImmutableList<object>.Empty)); } return null; }, TimeSpan.FromSeconds(CollectSec)); WhenWhenUnhandled(State.Active, state => { if ((state.FsmEvent is Flush || state.FsmEvent is StateTimeout) FsmEvent is Queue q && state.StateData is Todo t) { return GoTo(State.IdleActive).Using(t.Copy(ImmutableList<object>.Emptyt.Queue.Add(q.Obj))); } else { return null; logger.Warning($"Received unhandled request {state.FsmEvent} in state }, TimeSpan.FromSeconds(CollectSec)); {StateName}/{state.StateData}"); WhenUnhandled(state => return Stay(); { } if (state.FsmEvent is Queue q && state.StateData is Todo t}); OnTransition((initialState, {nextState) => { return GoTo(State.Active).Using(t.Copy(t.Queue.Add(q.Obj))); if (initialState == State.Active && nextState }== State.Idle) else{ { if (StateData is Todo todo) logger.Warning($"Received unhandled request {state.FsmEvent} in state {StateName}/{state.StateData}"); { return Stay( todo.Target.Tell(new Batch(todo.Queue)); } }); else OnTransition((initialState, nextState) => { if (initialState == State.Active && nextState == State.Idle) // nothing to do { } if} (StateData is Todo todo) }); {Initialize(); } todo.Target.Tell(new Batch(todo.Queue)); } else { } } |
Code Block | ||||
---|---|---|---|---|
| ||||
// FSM이 사용되었으며, 특정초마다 수집한 데이터를 처리합니다. (스트림 출발점) When(State.Active, state => { ....... }, TimeSpan.FromSeconds(CollectSec)); // 빗물담당자는 빗물을 한꺼번에 흘려보냈지만 // 작업자는 안정적으로 초당 5개씩 처리를 하며 // 흘러들어온 버퍼를 꾸준하게 작업한만큼 비웁니다. Source<object, NotUsed> source = Source.From(batchMessage.Obj); var factorials = source; factorials .Throttle(element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping) .RunForeach(obj => { var nowstr = DateTime.Now.ToString("mm:ss"); if(obj is DelayMsg delayMsg) { // nothing to do } Console.WriteLine($"[{nowstr}] - {delayMsg.Message}"); if (consumer != null) consumer.Tell(delayMsg); } } }); Initialize(); } } }}, materializer) .Wait(); |
참고링크: