시스템내에 유일한 단일 객체로 제약을 걸때 싱글톤은 유용하게 사용 되어왔습니다.
분산처리 컴퓨팅에서 싱글톤이란, 여러가지 클러스터로 구성된 동등한 어플리케이션에서
클러스터내에 단 하나만 필요한 기능만 수행할수 있습니다.
개요
클러스터 기능으로 유연하게 Job을 노드단위로 늘어난다고 했을때, 각 노드에서 발생하는
이벤트를 건바이건으로 RDB 적재하는것은 비효율적입니다.
아카의 클러스터 메시징은 병목없이 TPS를 늘려갈수 있으나, 단일 Db는 그렇지 못합니다.
짧은 시간에 생긴 이벤트에 대해 N개의 인서트 쿼리를 작동시키는것이아닌 단일 인스턴스에게
이벤트 적재를 위임함으로 효율적으로 적재할수 있으며, 3가지 기술을 활용하여
이문제를 해결할수 있습니다.
- 클러스터 싱글톤 액터 : 클러스터내에 단 하나만 작동가능 ( 해당 롤을가진 노드가 다운되면, 다른 노드에게 자동 위임합니다.)
- FSM Actor : 실시간 메시지에대한 상태머신컨셉을 적용할수 있으며. 특정 초만큼 메시징을 모아 컬렉션 처리가 가능합니다.
- 벌크 Insert : 대량 업데이트시 사용되며, 천건 건바이건으로 인서트 하는것보다. 십만건 벌크처리를 하는것이 훨씬 시간이 적게 소요됩니다.
클러스터 싱글톤 액터 구현
클러스터 싱글톤 액터는 클러스터내에 단하나만 존재하는 액터의 역활만 가지고 있으며
배치처리를 하는 기능 액터는 싱글톤의 Child로 존재하게 되면되겠습니다.
클러스터내에 싱글톤이 유지되는 방식
- a,b,c 3개의 노드가 떳고 a가 싱글톤 기능을 가지고 있다고 가정해봅시다.
- 스케일 다운을 위해 a가 다운이 되면, b와 c중 싱글톤 역활을 하며 위임됩니다.
- 만약 특정 노드가 다운될일이없고 고정하고 싶다고하면, 클러스터내에 싱글톤 Role을 하나의 노드에만 고정하면됩니다.
using System; using System.Collections.Generic; using Akka.Actor; namespace AkkaNetCore.Actors { public class SingleToneActor : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); private readonly string id; private IActorRef BatchWriter_Rev; private IActorRef BatchManager_Rev; public SingleToneActor() { BatchWriter_Rev = Context.ActorOf<BatchWriterActor>(); BatchManager_Rev = Context.ActorOf(Props.Create(() => new BatchActor(10))); BatchManager_Rev.Tell(new SetTarget(BatchWriter_Rev)); id = Guid.NewGuid().ToString(); logger.Info($"싱글톤 액터 생성:{id}"); startTime = DateTime.Now; totalCount = 0; ReceiveAsync<DelayMsg>(async msg => { BatchManager_Rev.Tell(new Queue(msg)); }); } } } //싱글톤 클러스터 액터작동 var actor = AkkaBoostrap.BootstrapSingleton<SingleToneActor>(actorSystem, "SingleToneActor", "akkanet"); SingleToneActor = AkkaBoostrap.BootstrapSingletonProxy(actorSystem, "SingleToneActor", "akkanet", "/user/SingleToneActor", "singleToneActorProxy");
싱들톤 액터가 가져야할 Child인 배치처리 액터는 상태머신(FSM)을 이용하였으며 다음과 같이 이용이가능합니다.
배치FMS 액터
BatchActor는 생성자 옵션으로 특정기간 수집해야할 시간값이 전달되며, 특정시간동안 수집된 실시간성 이벤트 컬렉션을 한꺼번에 처리할수있습니다.
아래코드는 , 작성된 BatchActor 의 작동검증을 하는 코드이며, Actor는 계층구조를 가지고있으며 필요한 Actor의 하위 어디든 위치를 시켜 활용하면 되겠습니다.
namespace AkkaNetCoreTest.Actors { // 특정기간동안 집계된 벌크 컬렉션은 이 액터에게 온다. public class TestBatchWriterActor : ReceiveActor { protected IActorRef probe; public TestBatchWriterActor(IActorRef _probe) { probe = _probe; ReceiveAsync<object>(async message => { if (message is Batch batchMessage) { probe.Tell(batchMessage); //벌크Insert 를 사용할수 있다. Console.WriteLine($"====== TODO 배치수행 :{batchMessage.Obj.Count}"); } }); } } class BatchActorTest : TestKit { protected TestProbe probe; [SetUp] public void Setup() { //배치가 컬렉션단위로 잘 수행하는지 관찰자 셋팅 probe = this.CreateTestProbe(); } // 테스트목적 : 이벤트가 발생할때마다 DB저장이 아닌, 특정시간 수집된 구간의 데이터 벌크인서트처리목적(벌크인서트는 건바이건보다 빠르다) // 벌크를 만드는 주기를 3초(collectSec)로 지정.. [TestCase(3)] public void LazyBatchAreOK(int collectSec) { var batchActor = Sys.ActorOf(Props.Create(() => new BatchActor(collectSec))); //배치저리 담당자 지정 : 배치처리를 검사하는 관찰자를 등록함 IActorRef batchWriterActor = Sys.ActorOf(Props.Create(() => new TestBatchWriterActor(probe))); batchActor.Tell(new SetTarget(batchWriterActor)); //이벤트는 실시간적으로 발생한다. batchActor.Tell(new Queue("오브젝트1")); batchActor.Tell(new Queue("오브젝트2")); batchActor.Tell(new Queue("오브젝트3")); //배치 처리할것이 없는것 확인 probe.ExpectNoMsg(); //배치 항목을 검사 : collectSec+1초를 기다려줌 var batchList = probe.ExpectMsg<Batch>(TimeSpan.FromSeconds(collectSec+1)).Obj; var firstItem = batchList[0] as string; Assert.AreEqual("오브젝트1", firstItem); Assert.AreEqual(3, batchList.Count); //이벤트는 실시간적으로 발생한다. batchActor.Tell(new Queue("오브젝트4")); batchActor.Tell(new Queue("오브젝트5")); batchActor.Tell(new Queue("오브젝트6")); batchActor.Tell(new Queue("오브젝트7")); //강제 벌크요청 batchActor.Tell(new Flush()); //배치 항목을 검사 batchList = probe.ExpectMsg<Batch>().Obj; firstItem = batchList[0] as string; Assert.AreEqual("오브젝트4", firstItem); Assert.AreEqual(4, batchList.Count); } [TestCase(3,2)] public void LazyBatchAreEmpty(int collectSec,int cutoffSec) { var batchActor = Sys.ActorOf(Props.Create(() => new BatchActor(collectSec))); //배치저리 담당자 지정 : 배치처리를 검사하는 관찰자를 등록함 IActorRef batchWriterActor = Sys.ActorOf(Props.Create(() => new TestBatchWriterActor(probe))); batchActor.Tell(new SetTarget(batchWriterActor)); //이벤트는 실시간적으로 발생한다. batchActor.Tell(new Queue("오브젝트1")); batchActor.Tell(new Queue("오브젝트2")); batchActor.Tell(new Queue("오브젝트3")); //cutoffSec 이전에는 처리할것이 없다. probe.ExpectNoMsg(TimeSpan.FromSeconds(cutoffSec)); } } }
Flush VS Clear
둘은 버퍼를 비우는 의미에서 유사하지만, 큰 차이가 있습니다.
Clear는 버퍼를 비우고 아무 할일이 없지만, Flush의 경우는 버퍼를 비우고 일괄처리를 진행하겠다란 의미가 포함되어 있습니다.
Flush의 경우 비동기로 쌓인 큐를 ,기능에 따라 전송또는 저장하는 액션이 발생할수 있습니다.
BulkInsert
이벤트마다 1건씩 Insert처리하는것은 여러모로 비효율적이며, ORM을 통해 한꺼번에 영속화(메모리->DB저장소)를 한다고 해도 비효율적인 성능이 발생함을 알수가 있습니다.
이벤트는 여러모로 BulkInsert 방식으로 저장가능해야하고, ORM이냐? SQL이냐? 문제는 중요하지 않습니다.
단지 ORM의 도움을 받았을때 더 스마트하고 짧은 코드로 처리가 가능합니다. ( Message Object → Entity Object → 데이터베이스에 영속화 )
모니터링
벌크처리능력을 매트릭스를 통해 처리량을 집계능력을 집계할때 유용할수 있습니다.
샘플은 AKKA 모니터링이 활용되었으며 데이터독과 연동되으며.., 이벤트 카운트를 1씩올리는게아닌
벌크처리된 카운팅을 올리면 되겠습니다.
if (bulkItems_completed.Count > 0 && IsWriteDB) { BatchType = "completed"; EntityFrameworkManager.ContextFactory = context => new BatchRepository(Startup.AppSettings); using (var context = new BatchRepository(Startup.AppSettings)) { await context.BulkInsertAsync(bulkItems_completed, options => { options.BatchSize = BatchSize; }); Context.IncrementCounter("akka.custom.received1", bulkItems_completed.Count); } }
참고 : Real time performance counters
연관 코드들:
- 배치처리를 포함한 싱글톤 액터 : https://github.com/psmon/AkkaForNetCore/blob/master/AkkaNetCore/Actors/Study/SingleToneActor.cs
- FSM기능을 가진 배치처리 액터 : https://github.com/psmon/AkkaForNetCore/blob/master/AkkaNetCore/Actors/Utils/BatchActor.cs
- 싱글톤 액터 생성하기 : https://github.com/psmon/AkkaForNetCore/blob/025e04a74dba7389cc7675ebaccd403d959a129b/AkkaNetCore/Startup.cs#L184
- 클러스터 설정 : https://github.com/psmon/AkkaForNetCore/blob/master/AkkaNetCore/akka.Development.conf
- 벌크 Insert : https://entityframeworkcore.com/saving-data-bulk-insert