Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
themeEmacs
titleunittest
collapsetrue
using System;
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.NUnit3;
using AkkaNetCore.Actors.Utils;
using AkkaNetCore.Models.Message;
using NUnit.Framework;

namespace AkkaNetCoreTest.Actors
{
    class ThrottleActorTest : TestKit
    {
        protected TestProbe probe;

        [SetUp]
        public void Setup()
        {
            //스트림을 제공받는 최종 소비자 ( 물을 제공 받는 고객 )
            probe = this.CreateTestProbe();
        }

        [TestCase(15)]
        public void ThrottleActorAreOK(int cutoffSec)
        {
            // 초당 5개 처리한정 ( 더 처리하고 싶으면 이값을 늘린다.)
            int timeSec = 1;
            int elemntPerSec = 5;            
            var throttleActor = Sys.ActorOf(Props.Create(() => new ThrottleActor(timeSec)));
            var throttleWork = Sys.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)));

            // 밸브에게 작업자 지정 ( 밸브는 초당 스트림을 모아서 방출한다 )
            // 작업자는 방류된 스트림을 기본적으로 쌓아두고, 초당 지정된 개수만 처리한다.
            throttleActor.Tell(new SetTarget(throttleWork));            

            // 소비자지정 : 소비자는 몇개가 초당 처리되던 상관없이, 완료된 작업만 제공받는다.
            throttleWork.Tell(new SetTarget(probe));

            Within(TimeSpan.FromSeconds(cutoffSec), () =>
            {
                // 50개 처리완료는 10초이내에 끝나야함...
                for(int i=0; i<50; i++)
                {
                    string seq = (i + 1).ToString();
                    throttleActor.Tell(new Queue(new DelayMsg()
                    {
                        Delay = 0,
                        Seq = seq,
                        Message = $"초당:{elemntPerSec} 테스트-{seq}",
                        State = DelayMsgState.Reserved
                    }));
                }

                DelayMsg lastMessage = null;  //물이라고 가정하자..(빗물->정제->포장 과정 생략)
                for (int i = 0; i < 50; i++)
                {
                    lastMessage =probe.ExpectMsg<DelayMsg>();
                }
                //마지막 메시지의 Seq는 50이여야함
                Assert.AreEqual("50", lastMessage.Seq);

            });
        }
        /* 위 테스트의 결과로그,블락킹없이 실시간으로 초당 5개씩 처리함
            [49:09] -  초당:5 테스트-1
            [49:09] -  초당:5 테스트-2
            [49:09] -  초당:5 테스트-3
            [49:09] -  초당:5 테스트-4
            [49:10] -  초당:5 테스트-5
            [49:10] -  초당:5 테스트-6
            [49:10] -  초당:5 테스트-7
            [49:10] -  초당:5 테스트-8
            [49:10] -  초당:5 테스트-9
            [49:11] -  초당:5 테스트-10         
        */
    }
}

액터구현

Code Block
themeEmacs
linenumberstitletrue
collapsetrue
주요코드설명
// 빗물을 처리하는 액터(지정된 초마다 작동)
var throttleActor = Sys.ActorOf(Props.Create(() => new ThrottleActor(timeSec)));
// 물을 생산하는 액터
var throttleWork = Sys.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)));

// 생산자와 소비자를 연결합니다
// DI가 사용이되지 않았으며,참조객체 디펀던시가 없습니다. 이것은 네트워크로 작업자가 분리될수 있음을 의미합니다.
throttleActor.Tell(new SetTarget(throttleWork));
throttleWork.Tell(new SetTarget(probe));

// 빗물을, 빗물 처리자에게 불규칙적으로 계속 보낼수 있습니다.
throttleActor.Tell(new Queue("빗물"));

//생산자는 그냥 물을 받게됩니다.
probe.ExpectMsg<DelayMsg>();


액터구현

Code Block
themeEmacs
titleThrottleActor
linenumberstrue
collapsetrue
using System;
using System.Collections.Immutable;
using Akka;
using Akka.Actor;
using Akka.Event;
using Akka.Monitoring;
using Akka.Streams;
using Akka.Streams.Dsl;
using AkkaNetCore.Models.Message;

namespace AkkaNetCore.Actors.Utils
{
    // 일괄 처리(데이터 인입)    
    public class ThrottleWork : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        private IActorRef consumer;

        public ThrottleWork(int element,int maxBust)
        {
            
            ReceiveAsync<SetTarget>(async target =>
            {
                consumer = target.Ref;
            });

            ReceiveAsync<object>(async message =>
            {
    using System;
using System.Collections.Immutable;
using Akka;
using Akka.Actor;
using Akka.Event;
using Akka.Monitoring;
using Akka.Streams;
using Akka.Streams.Dsl;
using AkkaNetCore.Models.Message;

namespace AkkaNetCore.Actors.Utils
{
    // 일괄 처리(데이터 인입)    
    public class ThrottleWork : ReceiveActor
    {
        private readonly ILoggingAdapter logger =if Context.GetLogger();

  (message is Batch batchMessage)
      private IActorRef consumer;

        public{
 ThrottleWork(int element,int maxBust)
        {
         int Count = batchMessage.Obj.Count;
            ReceiveAsync<SetTarget>(async target =>
      Context.IncrementMessagesReceived();
      {
              Source<object, NotUsed> consumersource = target.RefSource.From(batchMessage.Obj);

            });

           using ReceiveAsync<object>(asyncvar messagematerializer =>
 Context.Materializer())
                    {
                if  (message is Batch batchMessage)
   var factorials = source;
          {
              factorials
      int Count = batchMessage.Obj.Count;
                    Context.IncrementMessagesReceived();
                //.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}")
    Source<object, NotUsed> source = Source.From(batchMessage.Obj);

                    using .Throttle(var materializer = Context.Materializer())element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping)
                    {
         .RunForeach(obj => {             var factorials = source;
                 
       factorials
                          var nowstr = //.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}")DateTime.Now.ToString("mm:ss");
                             .Throttle(element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping)
    if(obj is DelayMsg delayMsg)
                               .RunForeach(obj => {                                     
                                  var nowstr = DateTimeConsole.Now.ToStringWriteLine("mm:ss$"[{nowstr}] -  {delayMsg.Message}");
                                     if (objconsumer is!= DelayMsg null) consumer.Tell(delayMsg);
                                 {}                                     
                                     Console.WriteLine($"[{nowstr}] -  {delayMsg.Message}");
           }, materializer)
                         if (consumer != null) consumer.TellWait(delayMsg);
                    }
                }
            });
        }
    }


    public class ThrottleActor : FSM<State, IData>
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();
        private int CollectSec;

        public ThrottleActor(int _CollectSec)
       }, materializer){
            CollectSec = _CollectSec;

               .Wait(StartWith(State.Idle, Uninitialized.Instance);

            When(State.Idle, state   =>
    }
        {
        }
            });
        }
    }


if (state.FsmEvent is SetTarget target && state.StateData is Uninitialized)
       public class ThrottleActor : FSM<State, IData>
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();
        private int CollectSec;

  return Stay().Using(new Todo(target.Ref, ImmutableList<object>.Empty));
      public ThrottleActor(int _CollectSec)
        {}

             CollectSec   =return _CollectSecnull;

            StartWith(State.Idle, Uninitialized.Instance});

            When(State.IdleActive, state =>
            {
                if ((state.FsmEvent is Flush || state.FsmEvent is SetTarget target StateTimeout)
                    && state.StateData is isTodo Uninitializedt)
                {
                    return StayGoTo(State.Idle).Using(new Todo(target.Ref, t.Copy(ImmutableList<object>.Empty));
                }

                return null;
            }, TimeSpan.FromSeconds(CollectSec));

            WhenWhenUnhandled(State.Active, state =>
            {
                if ((state.FsmEvent is Flush || state.FsmEvent is StateTimeout)
                    FsmEvent is Queue q && state.StateData is Todo t)
                {
                    return GoTo(State.IdleActive).Using(t.Copy(ImmutableList<object>.Emptyt.Queue.Add(q.Obj)));
                }
                else
                {
              return null;
     logger.Warning($"Received unhandled request {state.FsmEvent} in state  }, TimeSpan.FromSeconds(CollectSec));

{StateName}/{state.StateData}");
            WhenUnhandled(state =>
       return  Stay();
   {
             }
   if (state.FsmEvent is Queue q && state.StateData is Todo t});

            OnTransition((initialState,    {nextState) =>
            {
        return GoTo(State.Active).Using(t.Copy(t.Queue.Add(q.Obj)));
       if (initialState == State.Active && nextState    }== State.Idle)
                else{
                {
    if (StateData is Todo todo)
             logger.Warning($"Received unhandled request {state.FsmEvent} in state {StateName}/{state.StateData}");
       {
                      return Stay(  todo.Target.Tell(new Batch(todo.Queue));
                    }
              });

      else
      OnTransition((initialState, nextState) =>
            {
                    if (initialState == State.Active && nextState == State.Idle) // nothing to do
                {
    }
                if}
 (StateData is Todo todo)
        });

            {Initialize();
        }
                todo.Target.Tell(new Batch(todo.Queue));
                    }
                    else
                    {
}
}

Code Block
themeEmacs
title핵심코드 설명
// FSM이 사용되었으며, 특정초마다 수집한 데이터를 처리합니다. (스트림 출발점)
When(State.Active, state =>
{
.......	
}, TimeSpan.FromSeconds(CollectSec));

// 빗물담당자는 빗물을 한꺼번에 흘려보냈지만
// 작업자는 안정적으로 초당 5개씩 처리를 하며
// 흘러들어온 버퍼를 꾸준하게 작업한만큼 비웁니다.
Source<object, NotUsed> source = Source.From(batchMessage.Obj);
var factorials = source;
factorials	 
	 .Throttle(element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping)
	 .RunForeach(obj => {
		 var nowstr = DateTime.Now.ToString("mm:ss");
		 if(obj is DelayMsg delayMsg)
		 {                        // nothing to do
                    }
       
			 Console.WriteLine($"[{nowstr}] -  {delayMsg.Message}");
			 if (consumer != null) consumer.Tell(delayMsg);
		 }         }
            });

            Initialize();
	        }
    }
}}, materializer)
	 .Wait();



참고링크: