메시지 처리패턴의 한가지인 이벤트버스는, 다양한 메시지 큐시스템이 대부분 지원하며

채널별 메시지 발행(Publish)/구독(SubScribe) 기능을 가지고 있습니다.

Akka에서는 EventStream을 활용한 EventBus 유틸리티를 통해

유연하고 심플한 이벤트 필터처리가 가능한 Actor생성이 가능합니다.

구현 시나리오

  • ActorA와 ActorB는 TypeA메시지를 구독합니다.
  • ActorC는 TypeB메시지를 구독합니다.
  • ActorB가 받은 TypeA 메시지는 ActorC에게 전달됩니다.

이벤트 버스를 활용한 필터분기를 대상 액터에게 심플하게 지정할수 있으며

Actor의 기본메시지 전달기능을 통해, 커스텀한 Reply 기능 탑재도 가능합니다.


유닛테스트기를 통해 이벤트버스기능 살펴보기

다이어그램에 설계한 메시지 Flow를 구현하고, 유니테스트를 통해 메시지 검증을 할수 있습니다.

  • TypeA,TypeB 메시지를 각각 1번씩 발송합니다.
  • ActorA는 TypeA의 메시지를 1번 받습니다.
  • ActorB는 TypeA의 메시지를 1번 받습니다.
  • ActorC는 TypeB,TypA의 메시지를 각각 1번씩 받습니다.
        [Theory(DisplayName = "이벤트버스 테스트")]
        [InlineData(1)]
        public void Test1(int testCount)
        {
            var actorA = Sys.ActorOf(Props.Create(() => new ActorA(probeA)));
            var actorB = Sys.ActorOf(Props.Create(() => new ActorB(probeB)));
            var actorC = Sys.ActorOf(Props.Create(() => new ActorC(probeC)));

            // Link ActorB to ActorC
            actorB.Tell(actorC);

            //Subscribe
            Sys.EventStream.Subscribe(actorA, typeof(TypeA));
            Sys.EventStream.Subscribe(actorB, typeof(TypeA));
            Sys.EventStream.Subscribe(actorC, typeof(TypeB));

            //Publish
            Sys.EventStream.Publish(new TypeA() { Text = "TypeA_Message" } );
            Sys.EventStream.Publish(new TypeB() { Text = "TypeB_Message" });

            //UnitTest
            probeA.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
            probeB.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
            probeC.ExpectMsg<TypeB>(TimeSpan.FromSeconds(1));
            probeC.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
        }



전체코드

git : https://github.com/psmon/AkkaDotModule/blob/master/TestAkkaDotModule/TestActors/EventBusTest.cs

using Akka.Actor;
using Akka.Event;
using Akka.TestKit;
using AkkaNetCoreTest;
using System;
using Xunit;
using Xunit.Abstractions;

namespace TestAkkaDotModule.TestActors
{
    // https://getakka.net/articles/utilities/event-bus.html

    public class TypeA
    {
        public string Text { get; set; }        

    }
    public class TypeB
    {
        public string Text { get; set; }
    }

    public class ActorA : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        protected IActorRef probe;

        public ActorA(IActorRef _probe)
        {
            probe = _probe;

            ReceiveAsync<TypeA>(async message =>
            {
                logger.Debug($"InMessage:{message.Text}");

                probe.Tell(message);

            });
        }
    }

    public class ActorB : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        protected IActorRef probe;

        protected IActorRef reply;

        public ActorB(IActorRef _probe)
        {
            probe = _probe;

            ReceiveAsync<TypeA>(async message =>
            {
                logger.Debug($"InMessage:{message.Text}");

                if(probe!=null) probe.Tell(message);

                if(reply!=null) reply.Tell(message);

            });

            ReceiveAsync<IActorRef>(async actorRef =>
            {
                reply = actorRef;
            });

        }
    }

    public class ActorC : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        protected IActorRef probe;

        public ActorC(IActorRef _probe)
        {
            probe = _probe;

            ReceiveAsync<TypeA>(async message =>
            {
                logger.Debug($"InMessage:{message.Text}");

                probe.Tell(message);

            });

            ReceiveAsync<TypeB>(async message =>
            {
                logger.Debug($"InMessage:{message.Text}");

                probe.Tell(message);

            });
        }
    }

    public class EventBusTest : TestKitXunit
    {
        protected TestProbe probeA;

        protected TestProbe probeB;

        protected TestProbe probeC;

        public EventBusTest(ITestOutputHelper output) : base(output)
        {
            Setup();
        }

        public void Setup()
        {
            probeA = this.CreateTestProbe("probeA");
            probeB = this.CreateTestProbe("probeB");
            probeC = this.CreateTestProbe("probeC");

        }

        [Theory(DisplayName = "이벤트버스 테스트")]
        [InlineData(1)]
        public void Test1(int testCount)
        {
            var actorA = Sys.ActorOf(Props.Create(() => new ActorA(probeA)));
            var actorB = Sys.ActorOf(Props.Create(() => new ActorB(probeB)));
            var actorC = Sys.ActorOf(Props.Create(() => new ActorC(probeC)));

            // Link ActorB to ActorC
            actorB.Tell(actorC);

            //Subscribe
            Sys.EventStream.Subscribe(actorA, typeof(TypeA));
            Sys.EventStream.Subscribe(actorB, typeof(TypeA));
            Sys.EventStream.Subscribe(actorC, typeof(TypeB));

            //Publish
            Sys.EventStream.Publish(new TypeA() { Text = "TypeA_Message" } );
            Sys.EventStream.Publish(new TypeB() { Text = "TypeB_Message" });

            //UnitTest
            probeA.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
            probeB.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
            probeC.ExpectMsg<TypeB>(TimeSpan.FromSeconds(1));
            probeC.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
        }
    }
}


참고원문:


  • No labels