Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

메시지 처리패턴의 한가지인 이벤트버스는, 다양한 메시지 큐시스템이 대부분 지원하며

채널별 메시지 발행(Publish)/구독(SubScribe) 기능을 가지고 있습니다.

Akka에서는 EventStream을 활용한 EventBus 유틸리티를 통해

유연하고 심플한 이벤트 필터처리가 가능한 Actor생성이 가능합니다.


이벤트 버스를 활용한 필터분기를 대상 액터에게 심플하게 지정할수 있으며

Actor의 기본메시지 전달기능을 통해, 커스텀한 Reply 기능 탑재도 가능합니다.



Code Block
themeEmacs
        [Theory(DisplayName = "이벤트버스 테스트")]
        [InlineData(1)]
        public void Test1(int testCount)
        {
            var actorA = Sys.ActorOf(Props.Create(() => new ActorA(probeA)));
            var actorB = Sys.ActorOf(Props.Create(() => new ActorB(probeB)));
            var actorC = Sys.ActorOf(Props.Create(() => new ActorC(probeC)));

            // Link ActorB to ActorC
            actorB.Tell(actorC);

            //Subscribe
            Sys.EventStream.Subscribe(actorA, typeof(TypeA));
            Sys.EventStream.Subscribe(actorB, typeof(TypeA));
            Sys.EventStream.Subscribe(actorC, typeof(TypeB));

            //Publish
            Sys.EventStream.Publish(new TypeA() { Text = "TypeA_Message" } );
            Sys.EventStream.Publish(new TypeB() { Text = "TypeB_Message" });

            //UnitTest
            probeA.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
            probeB.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
            probeC.ExpectMsg<TypeB>(TimeSpan.FromSeconds(1));
            probeC.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
        }


전체코드

Code Block
themeEmacs
linenumberstrue
collapsetrue
using Akka.Actor;
using Akka.Event;
using Akka.TestKit;
using AkkaNetCoreTest;
using System;
using Xunit;
using Xunit.Abstractions;

namespace TestAkkaDotModule.TestActors
{
    // https://getakka.net/articles/utilities/event-bus.html

    public class TypeA
    {
        public string Text { get; set; }        

    }
    public class TypeB
    {
        public string Text { get; set; }
    }

    public class ActorA : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        protected IActorRef probe;

        public ActorA(IActorRef _probe)
        {
            probe = _probe;

            ReceiveAsync<TypeA>(async message =>
            {
                logger.Debug($"InMessage:{message.Text}");

                probe.Tell(message);

            });
        }
    }

    public class ActorB : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        protected IActorRef probe;

        protected IActorRef reply;

        public ActorB(IActorRef _probe)
        {
            probe = _probe;

            ReceiveAsync<TypeA>(async message =>
            {
                logger.Debug($"InMessage:{message.Text}");

                if(probe!=null) probe.Tell(message);

                if(reply!=null) reply.Tell(message);

            });

            ReceiveAsync<IActorRef>(async actorRef =>
            {
                reply = actorRef;
            });

        }
    }

    public class ActorC : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        protected IActorRef probe;

        public ActorC(IActorRef _probe)
        {
            probe = _probe;

            ReceiveAsync<TypeA>(async message =>
            {
                logger.Debug($"InMessage:{message.Text}");

                probe.Tell(message);

            });

            ReceiveAsync<TypeB>(async message =>
            {
                logger.Debug($"InMessage:{message.Text}");

                probe.Tell(message);

            });
        }
    }

    public class EventBusTest : TestKitXunit
    {
        protected TestProbe probeA;

        protected TestProbe probeB;

        protected TestProbe probeC;

        public EventBusTest(ITestOutputHelper output) : base(output)
        {
            Setup();
        }

        public void Setup()
        {
            probeA = this.CreateTestProbe("probeA");
            probeB = this.CreateTestProbe("probeB");
            probeC = this.CreateTestProbe("probeC");

        }

        [Theory(DisplayName = "이벤트버스 테스트")]
        [InlineData(1)]
        public void Test1(int testCount)
        {
            var actorA = Sys.ActorOf(Props.Create(() => new ActorA(probeA)));
            var actorB = Sys.ActorOf(Props.Create(() => new ActorB(probeB)));
            var actorC = Sys.ActorOf(Props.Create(() => new ActorC(probeC)));

            // Link ActorB to ActorC
            actorB.Tell(actorC);

            //Subscribe
            Sys.EventStream.Subscribe(actorA, typeof(TypeA));
            Sys.EventStream.Subscribe(actorB, typeof(TypeA));
            Sys.EventStream.Subscribe(actorC, typeof(TypeB));

            //Publish
            Sys.EventStream.Publish(new TypeA() { Text = "TypeA_Message" } );
            Sys.EventStream.Publish(new TypeB() { Text = "TypeB_Message" });

            //UnitTest
            probeA.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
            probeB.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
            probeC.ExpectMsg<TypeB>(TimeSpan.FromSeconds(1));
            probeC.ExpectMsg<TypeA>(TimeSpan.FromSeconds(1));
        }
    }
}