Akka.net 액터메시지를 통해  분산처리및 로드밸런싱에 라우터장치를 살펴보고

분산처리 되는 목적지인 라우티를  아두이노 연결된 장비에 실제 램프를 켜는 변종실험을 해보겠습니다.

작동코드위치 : https://github.com/psmon/AkkaUno

아두이노란

아두이노란 물리적인 세계를 감지하고 제어할 수 있는 인터랙티브 객체들과 디지털 장치를 만들기 위한 도구로,

간단한 마이크로컨트롤러(Microcontroller) 보드를 기반으로 한 오픈 소스 컴퓨팅 플랫폼과 소프트웨어 개발 환경을 말합니다. 

다양한 장치들을 연결할수가 있으며, 여기서는 LED만제어하며  아두이노환경을 셋팅하는 부분은 생략하겠습니다.

시나리오구성

액터장치

  • ThrottleQueue : 메시지가 한꺼번에 간다고하면 분산처리되는것을 시각적으로 확인이 불가능하며 초당처리 흐름제어가 가능한 AkkaStream이 이용되었습니다.
  • RoundRobin(라우터) : 메시지가 들어오면, 그룹으로 묶인 액트에게 순차적으로 메시지를 분배합니다.
  • WorkActor : 각 작업액트로 메시지가 들어오면, UnoActor에 LED를 켜고끄는 시그널을 전송합니다. 액트(n):LED(n) 으로 1:1연결됩니다. ( 시리얼 통신 )
  • UnoActor : 우노장비의 시리얼통신만 담당합니다. 받은 문자열을 그대로 장비에 전송합니다.

코드구현

기기에 임베디드되는 우노코드

시리얼포트로부터 문자가 오면, 해당문자에 연결된 LED를 켜고/끄는 코드입니다.

비교적 간단한 기기제어부분만 C++로 프로그래밍하고, 기기를 제어하는 이벤트발생 부분은 Akka.net으로 분리하였습니다.


코드샘플 : https://github.com/psmon/AkkaUno/blob/master/UnoAkkaApp/Module/AduinoApp.ino


우노를 제어하는 액터코드

시리얼통신만 담당합니다.

using Akka.Actor;
using Akka.Event;
using System.IO.Ports;

namespace UnoAkkaApp.Actors
{
    public class UnoActor : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        // 참고 : 리눅스의 경우 SerialPortStream 사용(현재 윈도우 지원)
        private readonly SerialPort arduSerialPort;        

        public UnoActor()
        {
            arduSerialPort = new SerialPort();
            arduSerialPort.PortName = "COM3";   //아두이노가 연결된 시리얼 포트 번호 지정
            arduSerialPort.BaudRate = 9600;     //시리얼 통신 속도 지정
            arduSerialPort.Open();              //포트 오픈

            ReceiveAsync<string>(async command =>
            {
                logger.Info($"Receive : {command}");
                arduSerialPort.Write(command);
            });
        }
    }
}


워크 액터

nodeNo는 자신의 노드번호에 해당하며, 작업발생시 위 UnoActor를 이용하여 각노드에연결된 LED에 시그널을 보냅니다.

using Akka.Actor;
using Akka.Event;
using AkkaDotModule.Models;

namespace UnoAkkaApp.Actors
{
    public class WorkActor : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        private readonly IActorRef _unoActor;

        public WorkActor(IActorRef unoActor,int nodeNum)
        {
            _unoActor = unoActor;

            int _nodeNo = nodeNum;

            ReceiveAsync<BatchData>(async command =>
            {
                logger.Info($"Receive Data..");
                unoActor.Tell(nodeNum.ToString());
            });
        }
    }
}


액터를 조합하기 - 최종작동코드

            // start ActorSystem                        
            AkkaSystem = ActorSystem.Create("AkkaSystem");

            var unoActor = AkkaSystem.ActorOf(Props.Create(() => new UnoActor()), "unoActor");

            List<string> workActors = new List<string>();

            for (int i = 0; i < 9; i++)
            {
                string actorName = $"workActor{i + 1}";
                var curWorkActor = AkkaSystem.ActorOf(Props.Create(() => new WorkActor(unoActor, i + 1)), actorName);
                curWorkActor.Tell(new BatchData()); //연결된 Led가 켜지는지 Test(동시모드로 전체 LedOn)
                workActors.Add($"/user/{actorName}");
            }

            // 참고 : https://getakka.net/articles/actors/routers.html            
            var router = AkkaSystem.ActorOf(Props.Empty.WithRouter(new RoundRobinGroup(workActors)), "roundRobinGroup");

            // 밸브 Work : 초당 작업량을 조절                
            int timeSec = 1;
            int elemntPerSec = 2;
            var throttleWork = AkkaSystem.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)), "throttleWork");
            // 밸브 작업자를 지정
            throttleWork.Tell(new SetTarget(router));

            //분산처리할 100개의 샘플데이터생성
            List<object> batchDatas = new List<object>();
            for (int i = 0; i < 100; i++)
            {
                batchDatas.Add(new BatchData() { Data="SomeData" });
            }
            BatchList batchList = new BatchList(batchDatas.ToImmutableList());

            //100개의 데이터전송(벌크)
            throttleWork.Tell(batchList);
  • UnoActor : 우노장비의 시리얼통신을 담당합니다.
  • ThrottleWork : 램프의 점등을 위해 속도를 제어합니다. 여기서의 출력은 분산처리를 위해 라운드 로빈으로 연결됩니다.
  • WorkActor : N개를 생성할수 있는 분산노드입니다. 이벤트가 발생하면 연결된 Led를 점등합니다. 우노장비에게 통신을 할수 있는 액터참조를 가지게됩니다.
  • RoundRobinGroup : N개의 WorkActor를 등록하여 분산처리될수 있게합니다.


작동영상 - 라운드로빈

akkauno-led.mp4


라우터를 변경해보기

위 시연동영상은 라운드로빈을 사용하였기때문에 작업이 순차적으로 분배가됩니다.

라우터와 라우티는 로드밸런스에서 중요한 요소이며, 다음과 같은 의미를 가지고 있습니다.

  • 라우터 : 목적지(라우티)를 결정하는 장치
  • 라우티 : 목적지가 도달하는 위치
  • 라우팅 : 라우터에서 라우티까지 작동되는 과정 자체


AKKA초기 셋팅하는과정은 다소 번거롭지만, 다음과같이 AKKA에서 제공되는 라우터 변경만으로 

순차처리가 아닌 랜덤처리가 됩니다.

var router = AkkaSystem.ActorOf(Props.Empty.WithRouter(new RandomGroup(workActors)), "roundRobinGroup");
==>
var router = AkkaSystem.ActorOf(Props.Empty.WithRouter(new RandomGroup(workActors)), "randomGroup");


지금까지 작동코드를 AKKA없이 작동시켜야한다고 하면 다음과같은 방식을 선택할수 있으나, 

  • 지금까지의 코드를 큐와/스레드없이 구현한다고하면 sleep을 사용해서 램프의 속도와 타이밍을 맞추어야합니다.  구현난이도는 감소할수있지만 스레드를 차단함으로 성능을 크게 희생해야합니다.
  • 큐와/스레드를 직접구현한다고하면, 난이도가 높은 부자연스러운 코드가 많이 작성될것입니다. 구현난이도는 증가하며 성능과 안전성은 작성자의 수준에 따라 편차가 있을수 있습니다.

작동연상 - 랜덤

akkauni-random.mp4



다양한 라우터 제공

AKKA에서는 기초적인 라운드로빈 랜덤외에 다양한 라우터를 제공하여 작업분배시 활용할수가 있습니다. 

클러스터에서 활용한다고 하면,  로드밸런싱을 직접 설계할수가 있습니다.


몇가지 유용한 고급 제공되는 라우터를 알아보겠습니다.

ConsistentHashing

라우팅방식 : 데이터의 헤더값을 해시값으로 분석해, 특정 데이터가 고정된 노드에게 유도될수 있게 보장분배처리를 합니다.

로컬캐시는 일반적으로, 네트워크 중앙집중 캐시보다 빠르며, X라는 특정 사용자의 상태를 로컬자체에서 처리할때 활용될수 있습니다.



SmallestMailbox

라운드로빈은 Task작업완료시간이 동일한 경우, 유용할수 있으나 Task의 완료시간은 동일하지 않은 경우가 더 많습니다.

덜바쁜 라우티에게 우선분배가되어 전체적인 밸런스를 유지할수있을때 유용할수 있습니다.

TailChopping

다양한 원인으로 불특정노드만 일시적으로 느려지는 경우가 있습니다. 이 경우 활용될수 있으며

한가지 예로,  GC최적화가 안되었다고하면  GC수행을 할수 있는 휴식시간을 줄수 있습니다.

작동방식은, 꼬리자르기로 느려진 노드를 쉬게하는 방식으로 특정시간이상 응답이 느려진 노드를 일시적으로 라우터분배에서

제외하며 특정시간이 지나면 다시 참여를 시킵니다. 


더많은 라우터정보 참고: https://getakka.net/articles/actors/routers.html


응용편 - AKKA STREAM 

Akka와 아두이노의 기기를 연결하였으며, Akka에서 제공하는 Stream에의해 속도흐름제어를 할수 있습니다.

참고 : https://petabridge.com/blog/why-akkadotnet-streams/


가변저항장치의 값에따라 속도가 변경되며, 흐름제어를 하는 ThrottleWork에게 숫자값을 전송하여 처리속도를 동적으로 변경할수 있습니다.

using System;
using Akka;
using Akka.Actor;
using Akka.Event;
using Akka.Streams;
using Akka.Streams.Dsl;
using AkkaDotModule.Models;

namespace UnoAkkaApp.Actors
{
    public class ThrottleWork : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        private IActorRef consumer;

        private int countPerSec;

        public ThrottleWork(int element, int maxBust)
        {
            countPerSec = element;

            ReceiveAsync<SetTarget>(async target =>
            {
                consumer = target.Ref;
            });

            ReceiveAsync<int>(async count =>
            {
                countPerSec = count;

                logger.Info($"ThrottleWork Spped:{countPerSec}");
            });

            ReceiveAsync<BatchList>(async batchMessage =>
            {
                int Count = batchMessage.Obj.Count;
                Source<object, NotUsed> source = Source.From(batchMessage.Obj);

                using (var materializer = Context.Materializer())
                {
                    var factorials = source;
                    factorials
                         .Throttle(countPerSec, TimeSpan.FromSeconds(1), maxBust, ThrottleMode.Shaping)
                         .RunForeach(obj => {
                             var nowstr = DateTime.Now.ToString("mm:ss");
                             if (obj is BatchData batchData)
                             {
                                 if (consumer != null) consumer.Tell(batchData);
                             }
                         }, materializer)
                         .Wait();
                }
            });
        }
    }
}


작동영상 - 가변저항을 Stream에 연결하여 속도제어

akkauni-stream.mp4


단일기기에 배포하기

우노보드는 개발장비에 연결하여, 개발할때 유용하며(개발장비와 USB연결)

단일기기 스탠드얼론으로 작동하게 하기위해서는  아두이노가 지원되면서 윈도우/리눅스 OS 선택가능한 - 닷넷코어구동가능

미니피시로도 활용가능한 라떼판다를 통해 소형화(우노와 장비컨트롤 인터페이스 동일)가 가능합니다. 

우노장비

라떼판다


  • No labels
Write a comment…