Page History
Info |
---|
메시지 처리패턴의 한가지인 이벤트버스는, 다양한 메시지 큐시스템이 대부분 지원하며 채널별 메시지 발행(Publish)/구독(SubScribe) 기능을 가지고 있습니다. Akka에서는 EventStream을 활용한 EventBus 유틸리티를 통해 유연하고 심플한 이벤트 필터처리가 가능한 Actor생성이 가능합니다. |
이벤트 버스를 활용한 필터분기를 대상 액터에게 심플하게 지정할수 있으며
Actor의 기본메시지 전달기능을 통해, 커스텀한 Reply 기능 탑재도 가능합니다.
Code Block | ||
---|---|---|
| ||
[Theory(DisplayName = "이벤트버스 테스트")]
[InlineData(1)]
public void Test1(int testCount)
{
var actorA = Sys.ActorOf(Props.Create(() => new ActorA(probeA)));
var actorB = Sys.ActorOf(Props.Create(() => new ActorB(probeB)));
var actorC = Sys.ActorOf(Props.Create(() => new ActorC(probeC)));
// Link ActorB to ActorC
actorB.Tell(actorC);
//Subscribe
Sys.EventStream.Subscribe(actorA, typeof(TypeA));
Sys.EventStream.Subscribe(actorB, typeof(TypeA));
Sys.EventStream.Subscribe(actorC, typeof(TypeB));
//Publish
Sys.EventStream.Publish(new TypeA() { Text = "TypeA_Message" } );
Sys.EventStream.Publish(new TypeB() { Text = "TypeB_Message" });
//UnitTest
probeA.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
probeB.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
probeC.ExpectMsg<TypeB>(TimeSpan.FromSeconds(1));
probeC.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
} |
전체코드
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
using Akka.Actor;
using Akka.Event;
using Akka.TestKit;
using AkkaNetCoreTest;
using System;
using Xunit;
using Xunit.Abstractions;
namespace TestAkkaDotModule.TestActors
{
// https://getakka.net/articles/utilities/event-bus.html
public class TypeA
{
public string Text { get; set; }
}
public class TypeB
{
public string Text { get; set; }
}
public class ActorA : ReceiveActor
{
private readonly ILoggingAdapter logger = Context.GetLogger();
protected IActorRef probe;
public ActorA(IActorRef _probe)
{
probe = _probe;
ReceiveAsync<TypeA>(async message =>
{
logger.Debug($"InMessage:{message.Text}");
probe.Tell(message);
});
}
}
public class ActorB : ReceiveActor
{
private readonly ILoggingAdapter logger = Context.GetLogger();
protected IActorRef probe;
protected IActorRef reply;
public ActorB(IActorRef _probe)
{
probe = _probe;
ReceiveAsync<TypeA>(async message =>
{
logger.Debug($"InMessage:{message.Text}");
if(probe!=null) probe.Tell(message);
if(reply!=null) reply.Tell(message);
});
ReceiveAsync<IActorRef>(async actorRef =>
{
reply = actorRef;
});
}
}
public class ActorC : ReceiveActor
{
private readonly ILoggingAdapter logger = Context.GetLogger();
protected IActorRef probe;
public ActorC(IActorRef _probe)
{
probe = _probe;
ReceiveAsync<TypeA>(async message =>
{
logger.Debug($"InMessage:{message.Text}");
probe.Tell(message);
});
ReceiveAsync<TypeB>(async message =>
{
logger.Debug($"InMessage:{message.Text}");
probe.Tell(message);
});
}
}
public class EventBusTest : TestKitXunit
{
protected TestProbe probeA;
protected TestProbe probeB;
protected TestProbe probeC;
public EventBusTest(ITestOutputHelper output) : base(output)
{
Setup();
}
public void Setup()
{
probeA = this.CreateTestProbe("probeA");
probeB = this.CreateTestProbe("probeB");
probeC = this.CreateTestProbe("probeC");
}
[Theory(DisplayName = "이벤트버스 테스트")]
[InlineData(1)]
public void Test1(int testCount)
{
var actorA = Sys.ActorOf(Props.Create(() => new ActorA(probeA)));
var actorB = Sys.ActorOf(Props.Create(() => new ActorB(probeB)));
var actorC = Sys.ActorOf(Props.Create(() => new ActorC(probeC)));
// Link ActorB to ActorC
actorB.Tell(actorC);
//Subscribe
Sys.EventStream.Subscribe(actorA, typeof(TypeA));
Sys.EventStream.Subscribe(actorB, typeof(TypeA));
Sys.EventStream.Subscribe(actorC, typeof(TypeB));
//Publish
Sys.EventStream.Publish(new TypeA() { Text = "TypeA_Message" } );
Sys.EventStream.Publish(new TypeB() { Text = "TypeB_Message" });
//UnitTest
probeA.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
probeB.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
probeC.ExpectMsg<TypeB>(TimeSpan.FromSeconds(1));
probeC.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
}
}
}
|