메시지 처리패턴의 한가지인 이벤트버스는, 다양한 메시지 큐시스템이 대부분 지원하며 채널별 메시지 발행(Publish)/구독(SubScribe) 기능을 가지고 있습니다. Akka에서는 EventStream을 활용한 EventBus 유틸리티를 통해 유연하고 심플한 이벤트 필터처리가 가능한 Actor생성이 가능합니다. |
이벤트 버스를 활용한 필터분기를 대상 액터에게 심플하게 지정할수 있으며
Actor의 기본메시지 전달기능을 통해, 커스텀한 Reply 기능 탑재도 가능합니다.
다이어그램에 설계한 메시지 Flow를 구현하고, 유니테스트를 통해 메시지 검증을 할수 있습니다.
[Theory(DisplayName = "이벤트버스 테스트")] [InlineData(1)] public void Test1(int testCount) { var actorA = Sys.ActorOf(Props.Create(() => new ActorA(probeA))); var actorB = Sys.ActorOf(Props.Create(() => new ActorB(probeB))); var actorC = Sys.ActorOf(Props.Create(() => new ActorC(probeC))); // Link ActorB to ActorC actorB.Tell(actorC); //Subscribe Sys.EventStream.Subscribe(actorA, typeof(TypeA)); Sys.EventStream.Subscribe(actorB, typeof(TypeA)); Sys.EventStream.Subscribe(actorC, typeof(TypeB)); //Publish Sys.EventStream.Publish(new TypeA() { Text = "TypeA_Message" } ); Sys.EventStream.Publish(new TypeB() { Text = "TypeB_Message" }); //UnitTest probeA.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1)); probeB.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1)); probeC.ExpectMsg<TypeB>(TimeSpan.FromSeconds(1)); probeC.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1)); } |
git : https://github.com/psmon/AkkaDotModule/blob/master/TestAkkaDotModule/TestActors/EventBusTest.cs
using Akka.Actor; using Akka.Event; using Akka.TestKit; using AkkaNetCoreTest; using System; using Xunit; using Xunit.Abstractions; namespace TestAkkaDotModule.TestActors { // https://getakka.net/articles/utilities/event-bus.html public class TypeA { public string Text { get; set; } } public class TypeB { public string Text { get; set; } } public class ActorA : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); protected IActorRef probe; public ActorA(IActorRef _probe) { probe = _probe; ReceiveAsync<TypeA>(async message => { logger.Debug($"InMessage:{message.Text}"); probe.Tell(message); }); } } public class ActorB : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); protected IActorRef probe; protected IActorRef reply; public ActorB(IActorRef _probe) { probe = _probe; ReceiveAsync<TypeA>(async message => { logger.Debug($"InMessage:{message.Text}"); if(probe!=null) probe.Tell(message); if(reply!=null) reply.Tell(message); }); ReceiveAsync<IActorRef>(async actorRef => { reply = actorRef; }); } } public class ActorC : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); protected IActorRef probe; public ActorC(IActorRef _probe) { probe = _probe; ReceiveAsync<TypeA>(async message => { logger.Debug($"InMessage:{message.Text}"); probe.Tell(message); }); ReceiveAsync<TypeB>(async message => { logger.Debug($"InMessage:{message.Text}"); probe.Tell(message); }); } } public class EventBusTest : TestKitXunit { protected TestProbe probeA; protected TestProbe probeB; protected TestProbe probeC; public EventBusTest(ITestOutputHelper output) : base(output) { Setup(); } public void Setup() { probeA = this.CreateTestProbe("probeA"); probeB = this.CreateTestProbe("probeB"); probeC = this.CreateTestProbe("probeC"); } [Theory(DisplayName = "이벤트버스 테스트")] [InlineData(1)] public void Test1(int testCount) { var actorA = Sys.ActorOf(Props.Create(() => new ActorA(probeA))); var actorB = Sys.ActorOf(Props.Create(() => new ActorB(probeB))); var actorC = Sys.ActorOf(Props.Create(() => new ActorC(probeC))); // Link ActorB to ActorC actorB.Tell(actorC); //Subscribe Sys.EventStream.Subscribe(actorA, typeof(TypeA)); Sys.EventStream.Subscribe(actorB, typeof(TypeA)); Sys.EventStream.Subscribe(actorC, typeof(TypeB)); //Publish Sys.EventStream.Publish(new TypeA() { Text = "TypeA_Message" } ); Sys.EventStream.Publish(new TypeB() { Text = "TypeB_Message" }); //UnitTest probeA.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1)); probeB.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1)); probeC.ExpectMsg<TypeB>(TimeSpan.FromSeconds(1)); probeC.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1)); } } } |
참고원문: