You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

개요

클러스터 기능으로 유연하게 Job을 노드단위로 늘어난다고 했을때, 각 노드에서 발생하는

이벤트를 건바이건으로 RDB 적재하는것은 비효율적입니다. 

아카의 클러스터 메시징은 병목없이 TPS를 늘려갈수 있으나, 단일 Db는 그렇지 못합니다.

짧은 시간에 생긴 이벤트에 대해 N개의 인서트 쿼리를 작동시키는것이아닌 단일 인스턴스에게

이벤트 적재를 위임함으로 효율적으로 적재할수 있으며, 3가지 기술을 활용하여

이문제를 해결할수 있습니다.

  • 클러스터 싱글톤 액터 : 클러스터내에 단 하나만 작동가능 ( 해당 롤을가진 노드가 다운되면, 다른 노드에게 자동 위임합니다.)
  • FSM Actor : 실시간 메시지에대한 상태머신컨셉을 적용할수 있으며. 특정 초만큼 메시징을 모아 컬렉션 처리가 가능합니다.
  • 벌크 Insert : 대량 업데이트시 사용되며, 천건 건바이건으로 인서트 하는것보다.  십만건 벌크처리를 하는것이 훨씬 시간이 적게 소요됩니다.

싱글톤 액터 구현

using System;
using System.Collections.Generic;
using Akka.Actor;

namespace AkkaNetCore.Actors
{
    public class SingleToneActor : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();
        private readonly string id;                   
        private IActorRef BatchWriter_Rev;
        private IActorRef BatchManager_Rev;
		
        public SingleToneActor()
        {
            BatchWriter_Rev = Context.ActorOf<BatchWriterActor>();
            BatchManager_Rev = Context.ActorOf(Props.Create(() => new BatchActor(10)));
            BatchManager_Rev.Tell(new SetTarget(BatchWriter_Rev));

            id = Guid.NewGuid().ToString();
            logger.Info($"싱글톤 액터 생성:{id}");
            startTime = DateTime.Now;            
            totalCount = 0;

            ReceiveAsync<DelayMsg>(async msg =>
            {
				BatchManager_Rev.Tell(new Queue(msg));
            });
        }
    }
}
//싱글톤 클러스터 액터작동                
var actor = AkkaBoostrap.BootstrapSingleton<SingleToneActor>(actorSystem, "SingleToneActor", "akkanet");
SingleToneActor = AkkaBoostrap.BootstrapSingletonProxy(actorSystem, "SingleToneActor", 
	"akkanet", "/user/SingleToneActor", "singleToneActorProxy");
git : https://github.com/psmon/AkkaForNetCore/blob/master/AkkaNetCore/Extensions/AkkaBoostrap.cs

싱글톤 액터는 클러스터내에 단하나만 존재하는 액터이며, 싱글톤 액터는 다시 배치처리를 하는 기능 액터를

자식으로 가지고 있으면 됩니다.


배치FMS 액터

namespace AkkaNetCoreTest.Actors
{
    // 특정기간동안 집계된 벌크 컬렉션은 이 액터에게 온다.
    public class TestBatchWriterActor : ReceiveActor
    {
        protected IActorRef probe;

        public TestBatchWriterActor(IActorRef _probe)
        {
            probe = _probe;
            ReceiveAsync<object>(async message =>
            {
                if (message is Batch batchMessage)
                {
                    probe.Tell(batchMessage);
                    Console.WriteLine($"====== TODO 배치수행 :{batchMessage.Obj.Count}");                    
                }
            });
        }
    }

    class BatchActorTest : TestKit
    {
        protected TestProbe probe;

        [SetUp]
        public void Setup()
        {
            //배치가 컬렉션단위로 잘 수행하는지 관찰자 셋팅
            probe = this.CreateTestProbe();
        }

        // 테스트목적 : 이벤트가 발생할때마다 DB저장이 아닌, 특정시간 수집된 구간의 데이터 벌크인서트처리목적(벌크인서트는 건바이건보다 빠르다)
        // 벌크를 만드는 주기를 3초(collectSec)로 지정..
        [TestCase(3)]
        public void LazyBatchAreOK(int collectSec)
        {            
            var batchActor = Sys.ActorOf(Props.Create(() => new BatchActor(collectSec)));

            //배치저리 담당자 지정 : 배치처리를 검사하는 관찰자를 등록함
            IActorRef batchWriterActor = Sys.ActorOf(Props.Create(() => new TestBatchWriterActor(probe)));
            batchActor.Tell(new SetTarget(batchWriterActor));

            //이벤트는 실시간적으로 발생한다.
            batchActor.Tell(new Queue("오브젝트1"));
            batchActor.Tell(new Queue("오브젝트2"));
            batchActor.Tell(new Queue("오브젝트3"));

            //배치 처리할것이 없는것 확인
            probe.ExpectNoMsg();

            //배치 항목을 검사 : collectSec+1초를 기다려줌            
            var batchList = probe.ExpectMsg<Batch>(TimeSpan.FromSeconds(collectSec+1)).Obj;
            
            var firstItem = batchList[0] as string;
            Assert.AreEqual("오브젝트1", firstItem);
            Assert.AreEqual(3, batchList.Count);

            //이벤트는 실시간적으로 발생한다.
            batchActor.Tell(new Queue("오브젝트4"));
            batchActor.Tell(new Queue("오브젝트5"));
            batchActor.Tell(new Queue("오브젝트6"));
            batchActor.Tell(new Queue("오브젝트7"));

            //강제 벌크요청
            batchActor.Tell(new Flush());

            //배치 항목을 검사
            batchList = probe.ExpectMsg<Batch>().Obj;
            firstItem = batchList[0] as string;
            Assert.AreEqual("오브젝트4", firstItem);
            Assert.AreEqual(4, batchList.Count);

        }

        [TestCase(3,2)]
        public void LazyBatchAreEmpty(int collectSec,int cutoffSec)
        {
            var batchActor = Sys.ActorOf(Props.Create(() => new BatchActor(collectSec)));
            //배치저리 담당자 지정 : 배치처리를 검사하는 관찰자를 등록함
            IActorRef batchWriterActor = Sys.ActorOf(Props.Create(() => new TestBatchWriterActor(probe)));
            batchActor.Tell(new SetTarget(batchWriterActor));

            //이벤트는 실시간적으로 발생한다.
            batchActor.Tell(new Queue("오브젝트1"));
            batchActor.Tell(new Queue("오브젝트2"));
            batchActor.Tell(new Queue("오브젝트3"));

            //cutoffSec 이전에는 처리할것이 없다.
            probe.ExpectNoMsg(TimeSpan.FromSeconds(cutoffSec));
        }
    }
}
git : https://github.com/psmon/AkkaForNetCore/blob/master/AkkaNetCore/Actors/Utils/BatchActor.cs












  • No labels