이벤트를 분산처리하기 위해 다음과 같은 잘 알려진 라우팅을 활용할수  있으며

  • Round Robbin : 순차 분배합니다.
  • Random : 랜덤 분배
  • BroadCast : 분배없이 전체 발송
  • MailBox : 덜바쁜 작업자에게 우선발송


분산처리 장치인 라우터는 다음과 같은 요소를 가지고 있습니다.

  • 라우터 : 분배가 발생하는 지점 또는 장치
  • 라우팅 : 분배의 방식 ( Robund Bobbin과 같은 분배로직 )
  • 라우티 : 분배가 도달하는 최종 위치로 최종 작업자에게 연결됨


단순하게 라우티가 많다고 안정적이고 빠르게 처리할수 있는것이 아닌 분배기에 조절기를 연결함으로 흐름을 제어할수 있습니다.

안정적인 흐름제어는,  유체(Stream)흐름 제어장치 에서 아이디어를 얻은것으로  Akka에서 제공하는 장치를 이용하여

복잡한 스레드/네트워크 프로그래밍 필요없이 닷넷및 자바를 이용하여  배압(Back Pressure)장치와 같은 흐름제어 장치를 구현할수 있습니다.



// Create ActorSystem
var actorSystem = akkaService.CreateActorSystem();

// Create RoundRobin Router
var roundrobin = actorSystem.ActorOf(Props.Create<BasicActor>().WithRouter(new RoundRobinPool(0)));

// Create Throttle And connect Router
int tps = 1;
var throttle = actorSystem.ActorOf(Props.Create(() => new ThrottleActor(tps)));
throttle.Tell(new SetTarget(roundrobin));

// Create Worker and Add Routee
var workActor = ActorSystem.ActorOf(Props.Create<BasicActor>(), NodeName);
var routee = Routee.FromActorRef(workActor);
roundrobin.Tell(new AddRoutee(routee));

// Say Hello
throttle.Tell(new TodoQueue()
{
    Todo = new Todo()
    {
        Id = i.ToString(),
        Title = ""Hello""                        
    }
});
ThrottleActor
    public class ThrottleActor : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        private IActorRef? consumer;

        private IActorRef _throttler;

        private readonly IMaterializer _materializer;

        private int _processCouuntPerSec;

        public ThrottleActor(int processCouuntPerSec)
        {
            _materializer = Context.Materializer();

            _processCouuntPerSec = processCouuntPerSec;

            _throttler =
                Source.ActorRef<object>(1000, OverflowStrategy.DropNew)
                      .Throttle(_processCouuntPerSec, TimeSpan.FromSeconds(1), _processCouuntPerSec, ThrottleMode.Shaping)
                      .To(Sink.ActorRef<object>(Self, NotUsed.Instance))
                      .Run(_materializer);

            ReceiveAsync<SetTarget>(async target =>
            {
                consumer = target.Ref;
            });


            ReceiveAsync<TPSInfoReq>(async target =>
            {
                Sender.Tell(_processCouuntPerSec);
            });

            ReceiveAsync<ChangeTPS>(async msg =>
            {
                var oldThrottler = _throttler;

                _processCouuntPerSec = msg.processCouuntPerSec;

                _throttler =
                    Source.ActorRef<object>(1000, OverflowStrategy.DropNew)
                            .Throttle(_processCouuntPerSec, TimeSpan.FromSeconds(1), _processCouuntPerSec, ThrottleMode.Shaping)
                            .To(Sink.ActorRef<object>(Self, NotUsed.Instance))
                            .Run(_materializer);

                oldThrottler.Tell(PoisonPill.Instance);

            });


            ReceiveAsync<TodoQueue>(async msg =>
            {
                _throttler.Tell(new Todo()
                {
                    Id = msg.Todo.Id,
                    Title = msg.Todo.Title
                });
            });

            ReceiveAsync<Todo>(async msg =>
            {
                logger.Info($"{msg.Id} - {msg.Title}");
                // TODO Something

                if (consumer != null)
                {
                    consumer.Tell(msg);
                }
            });
        }
    }


링크 :




  • No labels
Write a comment…