Page History
Info |
---|
오늘날 우리가 인터넷에서 서비스를 사용하는 방식은 데이터 전송을 비롯한 많은 스트리밍 데이터 인스턴스를 포함하고 있습니다. 그것이 다운로드 서비스이든,영화스트리밍 서비스이든 대용량의 데이터를 우리의 어플리케이션이 가진 메모리와 네트워크대역으로는 순간적으로 처리할수가 없습니다. 스트리밍은 항상 생산자와 소비자로 구분되어 있으며 어떻게 생산자와 소비자의 다양한 속도차이를 고려하여 잘처리하느냐에 대한 고민이 필요하여, Akka Stream은 이러한 스트림 처리를 위한 여러가지 장치를 제공합니다. 그중 유압조절을 할수있는 Throttle 장치에대해 간략하게 알아보고 활용해보겠습니다. |
스트림처리
문제
- 강우량은 측정은 할수 있지만, 정확하게 예측은 할수 없습니다. 이러한 비가 계속 내리며 저장탱크(버퍼)는 오로지 내리는 비만 저장을하며 용량의 한계가 있습니다.
- 비가 내리는중 저장된 탱크의 물을 정제하여, 마실수 있을때가 될때 소비자에게 쉬지않고 계속 지속적으로 소비하여, 효율적으로 고객에게 물을 공급해야합니다.
스트림을 활용하여 생산된(비)를 지속적으로 소비를 하여 저장탱크가 터지지 않고 소비자에게 물이 공급 하는 것이넘치지 않고, 또는 지속적으로 빗물을 소비하여
소비자에게 끊임없이 물을 공급해봅시다. 물론 비는 일정하지는 않지만 계속 내리고 있습니다.이 프로젝트에서 설명하고자하는 주요내용입니다.
배치처리는 스트림과 반대되는 개념으로 구현난이도는 쉬우나 , 오늘날의 트래픽을 처리하기에는 한계가 분명 존재합니다.
...
Akka Stream Flow 가 제공하는 기능들이며, 이것은 작업자가 직접 설계를 해야합니다.실시간 데이터가 버퍼에 넘치지 않도록 지속적으로 활동할수 있으며
저장탱크가 넘치지 않도록 개발자가 직접 설계에 반영해야합니다. ( AKKA가 모든것을 책임지지는 않습니다.)
코드 흐름 - 사용
유닛테스트를 활용하여 위와같은 흐름을 어떻게 제어를 하는지 사용부를 먼저 알아 보겠습니다.
...
Code Block | ||||
---|---|---|---|---|
| ||||
// 빗물을 처리하는 액터(지정된 초마다,빗물을 모아서 전달) var throttleActor = Sys.ActorOf(Props.Create(() => new ThrottleActor(timeSec))); // 물을 생산하는 액터(물생산은 전달받는대로, 초당 5개씩만 처리,느리다 싶으면 Work를 더 늘려준다.) var throttleWork = Sys.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec))); // 생산자와 소비자를 연결합니다 // DI가 사용이되지 않았으며,참조객체 디펀던시가 없습니다. 이것은 네트워크로 작업자가 분리될수 있음을 의미합니다. throttleActor.Tell(new SetTarget(throttleWork)); throttleWork.Tell(new SetTarget(probe)); // 빗물을 빗물은 불규칙적으로 쌓일수 있으나, 빗물작업자에게는 처리자에게어느정도 불규칙적으로모인 계속물이 보낼수 있습니다방류됩니다. throttleActor.Tell(new Queue(/*Any "빗물비" Object*/)); //생산자는 그냥고객은 작업자가 무엇을했는지 관심없습니다. 그냥 플리즈 워터(ExpectMsg)했을때, 지정된 시간에 물을 받게됩니다받으면 됩니다. probe.ExpectMsg<DelayMsg>(); |
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
using System; using System.Collections.Immutable; using Akka; using Akka.Actor; using Akka.Event; using Akka.Monitoring; using Akka.Streams; using Akka.Streams.Dsl; using AkkaNetCore.Models.Message; namespace AkkaNetCore.Actors.Utils { // 일괄 처리(데이터 인입) public class ThrottleWork : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); private IActorRef consumer; public ThrottleWork(int element,int maxBust) { ReceiveAsync<SetTarget>(async target => { consumer = target.Ref; }); ReceiveAsync<object>(async message => { if (message is Batch batchMessage) { int Count = batchMessage.Obj.Count; Context.IncrementMessagesReceived(); Source<object, NotUsed> source = Source.From(batchMessage.Obj); using (var materializer = Context.Materializer()) { var factorials = source; factorials //.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}") .Throttle(element, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping) .RunForeach(obj => { var nowstr = DateTime.Now.ToString("mm:ss"); if(obj is DelayMsg delayMsg) { Console.WriteLine($"[{nowstr}] - {delayMsg.Message}"); if (consumer != null) consumer.Tell(delayMsg); } }, materializer) .Wait(); } } }); } } public class ThrottleActor : FSM<State, IData> { private readonly ILoggingAdapter logger = Context.GetLogger(); private int CollectSec; public ThrottleActor(int _CollectSec) { CollectSec = _CollectSec; StartWith(State.Idle, Uninitialized.Instance); When(State.Idle, state => { if (state.FsmEvent is SetTarget target && state.StateData is Uninitialized) { return Stay().Using(new Todo(target.Ref, ImmutableList<object>.Empty)); } return null; }); When(State.Active, state => { if ((state.FsmEvent is Flush || state.FsmEvent is StateTimeout) && state.StateData is Todo t) { return GoTo(State.Idle).Using(t.Copy(ImmutableList<object>.Empty)); } return null; }, TimeSpan.FromSeconds(CollectSec)); WhenUnhandled(state => { if (state.FsmEvent is Queue q && state.StateData is Todo t) { return GoTo(State.Active).Using(t.Copy(t.Queue.Add(q.Obj))); } else { logger.Warning($"Received unhandled request {state.FsmEvent} in state {StateName}/{state.StateData}"); return Stay(); } }); OnTransition((initialState, nextState) => { if (initialState == State.Active && nextState == State.Idle) { if (StateData is Todo todo) { todo.Target.Tell(new Batch(todo.Queue)); } else { // nothing to do } } }); Initialize(); } } } |
클러스터와 함께 Stream은 Akka 에서 Stream처리는 어려운 부분중 하나입니다.
Stream처리에 대하 더 자세한것은 아래 내용을 참고하세요
...
- 05. Streams
- https://getakka.net/articles/streams/introduction.html
- https://alpakka.getakka.net/documentation/index.html