Page History
...
Code Block | ||
---|---|---|
| ||
using System; using System.Collections.Generic; using Akka.Actor; namespace AkkaNetCore.Actors { public class SingleToneActor : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); private readonly string id; private IActorRef BatchWriter_Rev; private IActorRef BatchManager_Rev; public SingleToneActor() { BatchWriter_Rev = Context.ActorOf<BatchWriterActor>(); BatchManager_Rev = Context.ActorOf(Props.Create(() => new BatchActor(10))); BatchManager_Rev.Tell(new SetTarget(BatchWriter_Rev)); id = Guid.NewGuid().ToString(); logger.Info($"싱글톤 액터 생성:{id}"); startTime = DateTime.Now; totalCount = 0; ReceiveAsync<DelayMsg>(async msg => { BatchManager_Rev.Tell(new Queue(msg)); }); } } } //싱글톤 클러스터 액터작동 var actor = AkkaBoostrap.BootstrapSingleton<SingleToneActor>(actorSystem, "SingleToneActor", "akkanet"); SingleToneActor = AkkaBoostrap.BootstrapSingletonProxy(actorSystem, "SingleToneActor", "akkanet", "/user/SingleToneActor", "singleToneActorProxy"); git : https://github.com/psmon/AkkaForNetCore/blob/master/AkkaNetCore/Extensions/AkkaBoostrap.cs |
싱글톤 액터는 클러스터내에 단하나만 존재하는 액터이며, 싱글톤 액터는 다시 배치처리를 하는 기능 액터를
자식으로 가지고 있으면 됩니다.
배치FMS 액터
Code Block |
---|
namespace AkkaNetCoreTest.Actors
{
// 특정기간동안 집계된 벌크 컬렉션은 이 액터에게 온다.
public class TestBatchWriterActor : ReceiveActor
{
protected IActorRef probe;
public TestBatchWriterActor(IActorRef _probe)
{
probe = _probe;
ReceiveAsync<object>(async message =>
{
if (message is Batch batchMessage)
{
probe.Tell(batchMessage);
Console.WriteLine($"====== TODO 배치수행 :{batchMessage.Obj.Count}");
}
});
}
}
class BatchActorTest : TestKit
{
protected TestProbe probe;
[SetUp]
public void Setup()
{
//배치가 컬렉션단위로 잘 수행하는지 관찰자 셋팅
probe = this.CreateTestProbe();
}
// 테스트목적 : 이벤트가 발생할때마다 DB저장이 아닌, 특정시간 수집된 구간의 데이터 벌크인서트처리목적(벌크인서트는 건바이건보다 빠르다)
// 벌크를 만드는 주기를 3초(collectSec)로 지정..
[TestCase(3)]
public void LazyBatchAreOK(int collectSec)
{
var batchActor = Sys.ActorOf(Props.Create(() => new BatchActor(collectSec)));
//배치저리 담당자 지정 : 배치처리를 검사하는 관찰자를 등록함
IActorRef batchWriterActor = Sys.ActorOf(Props.Create(() => new TestBatchWriterActor(probe)));
batchActor.Tell(new SetTarget(batchWriterActor));
//이벤트는 실시간적으로 발생한다.
batchActor.Tell(new Queue("오브젝트1"));
batchActor.Tell(new Queue("오브젝트2"));
batchActor.Tell(new Queue("오브젝트3"));
//배치 처리할것이 없는것 확인
probe.ExpectNoMsg();
//배치 항목을 검사 : collectSec+1초를 기다려줌
var batchList = probe.ExpectMsg<Batch>(TimeSpan.FromSeconds(collectSec+1)).Obj;
var firstItem = batchList[0] as string;
Assert.AreEqual("오브젝트1", firstItem);
Assert.AreEqual(3, batchList.Count);
//이벤트는 실시간적으로 발생한다.
batchActor.Tell(new Queue("오브젝트4"));
batchActor.Tell(new Queue("오브젝트5"));
batchActor.Tell(new Queue("오브젝트6"));
batchActor.Tell(new Queue("오브젝트7"));
//강제 벌크요청
batchActor.Tell(new Flush());
//배치 항목을 검사
batchList = probe.ExpectMsg<Batch>().Obj;
firstItem = batchList[0] as string;
Assert.AreEqual("오브젝트4", firstItem);
Assert.AreEqual(4, batchList.Count);
}
[TestCase(3,2)]
public void LazyBatchAreEmpty(int collectSec,int cutoffSec)
{
var batchActor = Sys.ActorOf(Props.Create(() => new BatchActor(collectSec)));
//배치저리 담당자 지정 : 배치처리를 검사하는 관찰자를 등록함
IActorRef batchWriterActor = Sys.ActorOf(Props.Create(() => new TestBatchWriterActor(probe)));
batchActor.Tell(new SetTarget(batchWriterActor));
//이벤트는 실시간적으로 발생한다.
batchActor.Tell(new Queue("오브젝트1"));
batchActor.Tell(new Queue("오브젝트2"));
batchActor.Tell(new Queue("오브젝트3"));
//cutoffSec 이전에는 처리할것이 없다.
probe.ExpectNoMsg(TimeSpan.FromSeconds(cutoffSec));
}
}
}
git : https://github.com/psmon/AkkaForNetCore/blob/master/AkkaNetCore/Actors/Utils/BatchActor.cs |