Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • 클러스터 싱글톤 액터 : 클러스터내에 단 하나만 작동가능 ( 해당 롤을가진 노드가 다운되면, 다른 노드에게 자동 위임합니다.)
  • FSM Actor : 실시간 메시지에대한 상태머신컨셉을 적용할수 있으며. 특정 초만큼 메시징을 모아 컬렉션 처리가 가능합니다.
  • 벌크 Insert : 대량 업데이트시 사용되며, 천건 건바이건으로 인서트 하는것보다.  십만건 벌크처리를 하는것이 훨씬 시간이 적게 소요됩니다.

싱글톤 액터 구현

싱글톤 액터는 클러스터내에 단하나만 존재하는 액터의 역활만 가지고 있으며

배치처리를 하는 기능 액터는 싱글톤의 Child로 존재하게 되면되겠습니다.

Code Block
themeEmacs
using System;
using System.Collections.Generic;
using Akka.Actor;

namespace AkkaNetCore.Actors
{
    public class SingleToneActor : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();
        private readonly string id;                   
        private IActorRef BatchWriter_Rev;
        private IActorRef BatchManager_Rev;
		
        public SingleToneActor()
        {
            BatchWriter_Rev = Context.ActorOf<BatchWriterActor>();
            BatchManager_Rev = Context.ActorOf(Props.Create(() => new BatchActor(10)));
            BatchManager_Rev.Tell(new SetTarget(BatchWriter_Rev));

            id = Guid.NewGuid().ToString();
            logger.Info($"싱글톤 액터 생성:{id}");
            startTime = DateTime.Now;            
            totalCount = 0;

            ReceiveAsync<DelayMsg>(async msg =>
            {
				BatchManager_Rev.Tell(new Queue(msg));
            });
        }
    }
}
//싱글톤 클러스터 액터작동                
var actor = AkkaBoostrap.BootstrapSingleton<SingleToneActor>(actorSystem, "SingleToneActor", "akkanet");
SingleToneActor = AkkaBoostrap.BootstrapSingletonProxy(actorSystem, "SingleToneActor", 
	"akkanet", "/user/SingleToneActor", "singleToneActorProxy");
git : https://github.com/psmon/AkkaForNetCore/blob/master/AkkaNetCore/Extensions/AkkaBoostrap.cs

싱글톤 액터는 클러스터내에 단하나만 존재하는 액터이며, 싱글톤 액터는 다시 배치처리를 하는 기능 액터를

자식으로 가지고 있으면 됩니다.

...

싱들톤 액터가 가져야할 Child인 배치처리 액터는 상태머신(FSM)을 이용하였으며 다음과 같이 이용이가능합니다.


배치FMS 액터

BatchActor는 생성자 옵션으로 특정기간 수집해야할 시간값이 전달되며, 특정시간동안 수집된 실시간성 이벤트 컬렉션을 한꺼번에 처리할수있습니다.

아래코드는 , 작성된 BatchActor 의 작동검증을 하는 코드이며, Actor는 계층구조를 가지고있으며 필요한 Actor의 하위 어디든 위치를 시켜 활용하면 되겠습니다.

Code Block
themeEmacs
namespace AkkaNetCoreTest.Actors
{
    // 특정기간동안 집계된 벌크 컬렉션은 이 액터에게 온다.
    public class TestBatchWriterActor : ReceiveActor
    {
        protected IActorRef probe;

        public TestBatchWriterActor(IActorRef _probe)
        {
            probe = _probe;
            ReceiveAsync<object>(async message =>
            {
                if (message is Batch batchMessage)
                {
                    probe.Tell(batchMessage);
                    Console.WriteLine($"====== TODO 배치수행 :{batchMessage.Obj.Count}");                    
                }
            });
        }
    }

    class BatchActorTest : TestKit
    {
        protected TestProbe probe;

        [SetUp]
        public void Setup()
        {
            //배치가 컬렉션단위로 잘 수행하는지 관찰자 셋팅
            probe = this.CreateTestProbe();
        }

        // 테스트목적 : 이벤트가 발생할때마다 DB저장이 아닌, 특정시간 수집된 구간의 데이터 벌크인서트처리목적(벌크인서트는 건바이건보다 빠르다)
        // 벌크를 만드는 주기를 3초(collectSec)로 지정..
        [TestCase(3)]
        public void LazyBatchAreOK(int collectSec)
        {            
            var batchActor = Sys.ActorOf(Props.Create(() => new BatchActor(collectSec)));

            //배치저리 담당자 지정 : 배치처리를 검사하는 관찰자를 등록함
            IActorRef batchWriterActor = Sys.ActorOf(Props.Create(() => new TestBatchWriterActor(probe)));
            batchActor.Tell(new SetTarget(batchWriterActor));

            //이벤트는 실시간적으로 발생한다.
            batchActor.Tell(new Queue("오브젝트1"));
            batchActor.Tell(new Queue("오브젝트2"));
            batchActor.Tell(new Queue("오브젝트3"));

            //배치 처리할것이 없는것 확인
            probe.ExpectNoMsg();

            //배치 항목을 검사 : collectSec+1초를 기다려줌            
            var batchList = probe.ExpectMsg<Batch>(TimeSpan.FromSeconds(collectSec+1)).Obj;
            
            var firstItem = batchList[0] as string;
            Assert.AreEqual("오브젝트1", firstItem);
            Assert.AreEqual(3, batchList.Count);

            //이벤트는 실시간적으로 발생한다.
            batchActor.Tell(new Queue("오브젝트4"));
            batchActor.Tell(new Queue("오브젝트5"));
            batchActor.Tell(new Queue("오브젝트6"));
            batchActor.Tell(new Queue("오브젝트7"));

            //강제 벌크요청
            batchActor.Tell(new Flush());

            //배치 항목을 검사
            batchList = probe.ExpectMsg<Batch>().Obj;
            firstItem = batchList[0] as string;
            Assert.AreEqual("오브젝트4", firstItem);
            Assert.AreEqual(4, batchList.Count);

        }

        [TestCase(3,2)]
        public void LazyBatchAreEmpty(int collectSec,int cutoffSec)
        {
            var batchActor = Sys.ActorOf(Props.Create(() => new BatchActor(collectSec)));
            //배치저리 담당자 지정 : 배치처리를 검사하는 관찰자를 등록함
            IActorRef batchWriterActor = Sys.ActorOf(Props.Create(() => new TestBatchWriterActor(probe)));
            batchActor.Tell(new SetTarget(batchWriterActor));

            //이벤트는 실시간적으로 발생한다.
            batchActor.Tell(new Queue("오브젝트1"));
            batchActor.Tell(new Queue("오브젝트2"));
            batchActor.Tell(new Queue("오브젝트3"));

            //cutoffSec 이전에는 처리할것이 없다.
            probe.ExpectNoMsg(TimeSpan.FromSeconds(cutoffSec));
        }
    }
}
git : 


연관 코드들: