You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

오늘날 우리가 인터넷에서 서비스를 사용하는 방식은 데이터 전송을 비롯한 많은 스트리밍 데이터 인스턴스를 포함하고 있습니다.

그것이 다운로드 서비스이든,영화스트리밍 서비스이든 대용량의 데이터를 우리의 어플리케이션이 가진 메모리와 네트워크대역으로는

순간적으로 처리할수가 없습니다. 스트리밍은 항상 생산자와 소비자로 구분되어 있으며 어떻게 생산자와 소비자의 다양한 속도차이를

고려하여 잘처리하느냐에 대한 고민이 필요하여, Akka Stream은 이러한 스트림 처리를 위한 여러가지 장치를 제공합니다.

그중 유압조절을 할수있는 Throttle 장치에대해 간략하게 알아보고 활용해보겠습니다.

스트림처리

문제

  • 강우량은 측정은 할수 있지만, 예측은 할수 없습니다.  이러한 비가 계속 내리며 저장탱크(버퍼)는 한계가 있습니다.
  • 저장된 탱크의 물을 정제하여, 마실수 있을때가 될때 소비자에게 쉬지않고 계속 공급해야합니다.

스트림을 활용하여 생산된(비)를 지속적으로 소비를 하여 저장탱크가 터지지 않고 소비자에게 물이 공급 하는 것이

이 프로젝트에서 설명하고자하는 주요내용입니다. 

배치처리는 스트림과 반대되는 개념으로 구현난이도는 쉬우나 , 오늘날의 트래픽을 처리하기에는 한계가 분명 존재합니다.

해결 시나리오

  • 비를 받는 저장탱크와, 정수에 사용되는 저장탱크 두개를 준비합니다.
  • A-throttleActor) : A작업자는, 쌓인 빗물을 작업자 탱크에게 지속적(1초) 으로 스트림을 흘려보냅니다. 
  • B-throttleWork) : B작업자는 초당 10ml 마실수 있는 물을 만들어 낼수있으며, 고객에게 서빙을 합니다.
  • C : 소비자는 작업자가 무엇을하는지 관심이 없고 물이 생기면 마십니다.

트래픽에따라 작업자의 안전한 작업량을 제한하고, 필요에 따라 작업자를 조절하고 저장 탱크(버퍼)를 조절할수 있는것이

Akka Stream Flow 가 제공하는 기능들이며, 이것은 작업자가 직접 설계를 해야합니다.

코드 흐름 - 사용

유닛테스트를 활용하여 위와같은 흐름을 어떻게 제어를 하는지 사용부를 먼저 알아 보겠습니다.

복잡한 기능을 단순화하고,유닛테스트를 통해 검증하는것이 웹노리 Akak.net이 추구하는 방향입니다.

unittest
using System;
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.NUnit3;
using AkkaNetCore.Actors.Utils;
using AkkaNetCore.Models.Message;
using NUnit.Framework;

namespace AkkaNetCoreTest.Actors
{
    class ThrottleActorTest : TestKit
    {
        protected TestProbe probe;

        [SetUp]
        public void Setup()
        {
            //스트림을 제공받는 최종 소비자 ( 물을 제공 받는 고객 )
            probe = this.CreateTestProbe();
        }

        [TestCase(15)]
        public void ThrottleActorAreOK(int cutoffSec)
        {
            // 초당 5개 처리한정 ( 더 처리하고 싶으면 이값을 늘린다.)
            int timeSec = 1;
            int elemntPerSec = 5;            
            var throttleActor = Sys.ActorOf(Props.Create(() => new ThrottleActor(timeSec)));
            var throttleWork = Sys.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)));

            // 밸브에게 작업자 지정 ( 밸브는 초당 스트림을 모아서 방출한다 )
            // 작업자는 방류된 스트림을 기본적으로 쌓아두고, 초당 지정된 개수만 처리한다.
            throttleActor.Tell(new SetTarget(throttleWork));            

            // 소비자지정 : 소비자는 몇개가 초당 처리되던 상관없이, 완료된 작업만 제공받는다.
            throttleWork.Tell(new SetTarget(probe));

            Within(TimeSpan.FromSeconds(cutoffSec), () =>
            {
                // 50개 처리완료는 10초이내에 끝나야함...
                for(int i=0; i<50; i++)
                {
                    string seq = (i + 1).ToString();
                    throttleActor.Tell(new Queue(new DelayMsg()
                    {
                        Delay = 0,
                        Seq = seq,
                        Message = $"초당:{elemntPerSec} 테스트-{seq}",
                        State = DelayMsgState.Reserved
                    }));
                }

                DelayMsg lastMessage = null;  //물이라고 가정하자..(빗물->정제->포장 과정 생략)
                for (int i = 0; i < 50; i++)
                {
                    lastMessage =probe.ExpectMsg<DelayMsg>();
                }
                //마지막 메시지의 Seq는 50이여야함
                Assert.AreEqual("50", lastMessage.Seq);

            });
        }
        /* 위 테스트의 결과로그,블락킹없이 실시간으로 초당 5개씩 처리함
            [49:09] -  초당:5 테스트-1
            [49:09] -  초당:5 테스트-2
            [49:09] -  초당:5 테스트-3
            [49:09] -  초당:5 테스트-4
            [49:10] -  초당:5 테스트-5
            [49:10] -  초당:5 테스트-6
            [49:10] -  초당:5 테스트-7
            [49:10] -  초당:5 테스트-8
            [49:10] -  초당:5 테스트-9
            [49:11] -  초당:5 테스트-10         
        */
    }
}


액터구현

using System;
using System.Collections.Immutable;
using Akka;
using Akka.Actor;
using Akka.Event;
using Akka.Monitoring;
using Akka.Streams;
using Akka.Streams.Dsl;
using AkkaNetCore.Models.Message;

namespace AkkaNetCore.Actors.Utils
{
    // 일괄 처리(데이터 인입)    
    public class ThrottleWork : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        private IActorRef consumer;

        public ThrottleWork(int element,int maxBust)
        {
            
            ReceiveAsync<SetTarget>(async target =>
            {
                consumer = target.Ref;
            });

            ReceiveAsync<object>(async message =>
            {
                if (message is Batch batchMessage)
                {
                    int Count = batchMessage.Obj.Count;
                    Context.IncrementMessagesReceived();
                    Source<object, NotUsed> source = Source.From(batchMessage.Obj);

                    using (var materializer = Context.Materializer())
                    {
                        var factorials = source;
                        factorials
                             //.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}")
                             .Throttle(element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping)
                             .RunForeach(obj => {                                 
                                 var nowstr = DateTime.Now.ToString("mm:ss");
                                 if(obj is DelayMsg delayMsg)
                                 {                                     
                                     Console.WriteLine($"[{nowstr}] -  {delayMsg.Message}");
                                     if (consumer != null) consumer.Tell(delayMsg);
                                 }                                 
                             }, materializer)
                             .Wait();
                    }
                }
            });
        }
    }


    public class ThrottleActor : FSM<State, IData>
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();
        private int CollectSec;

        public ThrottleActor(int _CollectSec)
        {
            CollectSec = _CollectSec;

            StartWith(State.Idle, Uninitialized.Instance);

            When(State.Idle, state =>
            {
                if (state.FsmEvent is SetTarget target && state.StateData is Uninitialized)
                {
                    return Stay().Using(new Todo(target.Ref, ImmutableList<object>.Empty));
                }

                return null;
            });

            When(State.Active, state =>
            {
                if ((state.FsmEvent is Flush || state.FsmEvent is StateTimeout)
                    && state.StateData is Todo t)
                {
                    return GoTo(State.Idle).Using(t.Copy(ImmutableList<object>.Empty));
                }

                return null;
            }, TimeSpan.FromSeconds(CollectSec));

            WhenUnhandled(state =>
            {
                if (state.FsmEvent is Queue q && state.StateData is Todo t)
                {
                    return GoTo(State.Active).Using(t.Copy(t.Queue.Add(q.Obj)));
                }
                else
                {
                    logger.Warning($"Received unhandled request {state.FsmEvent} in state {StateName}/{state.StateData}");
                    return Stay();
                }
            });

            OnTransition((initialState, nextState) =>
            {
                if (initialState == State.Active && nextState == State.Idle)
                {
                    if (StateData is Todo todo)
                    {
                        todo.Target.Tell(new Batch(todo.Queue));
                    }
                    else
                    {
                        // nothing to do
                    }
                }
            });

            Initialize();
        }
    }
}






참고링크:




  • No labels