메시지 전송은 성공 여부체크와 재전송여부에 따라 한번만전송,적어도한번만전송,정확하게 한번만 전송

3가지로 분류할수 있습니다. 정확하게 한번전송은 프로토콜이 책임지지 않으며 개발자가 직접 설계에 반영해야합니다.

여기서는 적어도 한번만 전송하는 제공된 액터를 활용하여, 정확하게 한번 전송을 하려고 노력하는 메시지 액터를 설계해봅니다.

구현하려는 시나리오

  • 배달기사 액터에서 발생한 메시지는 고객에게 전달된다.
  • 고객이 배달 확인을 하면 배달은 완료된다.
  • 고급택배는 고객에게 대면하여 전달하기때문에 전송 실패가 없다.
  • 일반택배는 문앞에 두기때문에, 분실로 인한 배달 실패가 될수 있다.
  • 실패 확인된 택배는 재 전송된다.

메시지 설계

namespace AkkaNetCore.Models.Message
{
    // AtLeastOnceDelivery를 위한 메시지 설계
    public class Msg
    {
        public Msg(long deliveryId, string message)
        {
            DeliveryId = deliveryId;
            Message = message;
        }

        public long DeliveryId { get; }

        public string Message { get; }
    }

    public class Confirm
    {
        public Confirm(long deliveryId)
        {
            DeliveryId = deliveryId;
        }

        public long DeliveryId { get; }
    }

    public interface IEvent
    {

    }

    public class MsgSent : IEvent
    {
        public MsgSent(string message)
        {
            Message = message;
        }

        public string Message { get; }
    }

    public class MsgConfirmed : IEvent
    {
        public MsgConfirmed(long deliveryId)
        {
            DeliveryId = deliveryId;
        }

        public long DeliveryId { get; }
    }
}


택배기사 액터 설계

using Akka.Actor;
using Akka.Event;
using Akka.Persistence;
using AkkaNetCore.Models.Message;

namespace AkkaNetCore.Actors.Study
{
    public class CustomActor : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        protected int messageCnt = 0;

        public CustomActor()
        {
            Receive<Msg>(msg =>
            {
                messageCnt++;
                if (!msg.Message.Contains("고급"))
                {
                    //일반택배는 자주 분실된다.
                    if (messageCnt % 2 == 0)
                    {
                        logger.Warning("택배를 찾을수 없습니다.");
                        return;
                    }
                }

                //택배를 받았기때문에 완료처리되었다.
                logger.Debug("네, 받았습니다.");
                Sender.Tell(new Confirm(msg.DeliveryId), Self);
            });
        }
    }

    public class DeliveryManActor : AtLeastOnceDeliveryReceiveActor
    {
        private readonly IActorRef _probe;

        private readonly IActorRef _destionationActor;

        private readonly ILoggingAdapter logger = Context.GetLogger();

        public override string PersistenceId { get; } = "persistence-id";

        public DeliveryManActor(IActorRef probe)
        {
            _probe = probe;

            _destionationActor = Context.ActorOf(Props.Create(() => new CustomActor()));

            Recover<MsgSent>(msgSent => Handler(msgSent));

            Recover<MsgConfirmed>(msgConfirmed => Handler(msgConfirmed));

            Command<string>(str =>
            {
                logger.Debug("received:" + str);
                Persist(new MsgSent(str), Handler);
            });

            Command<Confirm>(confirm =>
            {
                //메시지 받음을 확인하고, 해당 메시지를 더이상 안보낸다.
                logger.Debug("received confirm:" + confirm.DeliveryId);
                _probe.Tell("OK");
                Persist(new MsgConfirmed(confirm.DeliveryId), Handler);
            });

        }

        // 목적지에 메시지보냄 - 재전송포함
        private void Handler(MsgSent msgSent)
        {
            logger.Debug("택배발송되었습니다.:" + msgSent.Message);
            Deliver(_destionationActor.Path, l => new Msg(l, msgSent.Message));
        }

        // 메시지가 확인됨
        private void Handler(MsgConfirmed msgConfirmed)
        {
            ConfirmDelivery(msgConfirmed.DeliveryId);
        }
    }
}



유닛 테스트로 기능 확인해보기

using System;
using Akka.Actor;
using Akka.TestKit;
using AkkaNetCore.Actors.Study;
using Xunit;
using Xunit.Abstractions;

namespace AkkaNetCoreTest.Actors
{
    public class AtLeastOnceDeliveryTest : TestKitXunit
    {
        protected TestProbe probe;

        protected IActorRef deliveryManActor;

        public AtLeastOnceDeliveryTest(ITestOutputHelper output) : base(output)
        {            
            Setup();
        }

        public void Setup()
        {
            //여기서 관찰자는 고객이 받은 택배를 카운팅합니다.
            probe = this.CreateTestProbe();

            deliveryManActor = Sys.ActorOf(Props.Create(() => new DeliveryManActor(probe)));

        }

        [Theory]
        [InlineData(3,13000)]
        public void 일반택배는_일정한확률로_분실되며_분실시_재전송된다(int repeat, int cutoff)
        {
            for (int i = 0; i < repeat; i++)
            {
                deliveryManActor.Tell("일반택배발송:" + i);
            }

            Within(TimeSpan.FromMilliseconds(cutoff), () =>
            {
                for (int i = 0; i < repeat; i++)
                {
                    probe.ExpectMsg<string>(TimeSpan.FromMilliseconds(cutoff));
                }
            });
        }

        [Theory]
        [InlineData(100,300)]
        public void 고급택배는_항상성공하여_빠르게모두처리된다(int repeat,int cutoff)
        {
            for (int i = 0; i < repeat; i++)
            {
                deliveryManActor.Tell("고급택배발송:" + i);
            }

            Within(TimeSpan.FromMilliseconds(cutoff), () =>
            {
                for (int i = 0; i < repeat; i++)
                {
                    probe.ExpectMsg<string>(TimeSpan.FromMilliseconds(cutoff));
                }
            });
        }

    }
}

테스트 결과-일반택배는_일정한확률로_분실되며_분실시_재전송된다
[DEBUG][2020-03-15 오후 1:47:54][Thread 0025][akka://test/user/deliveryman] received:일반택배발송:0
[DEBUG][2020-03-15 오후 1:47:54][Thread 0025][akka://test/user/deliveryman] 택배발송되었습니다.:일반택배발송:0
[DEBUG][2020-03-15 오후 1:47:54][Thread 0023][akka://test/user/deliveryman/customer] 네, 받았습니다.
[DEBUG][2020-03-15 오후 1:47:54][Thread 0025][akka://test/user/deliveryman] received:일반택배발송:1
[DEBUG][2020-03-15 오후 1:47:54][Thread 0025][akka://test/user/deliveryman] 택배발송되었습니다.:일반택배발송:1
[WARNING][2020-03-15 오후 1:47:54][Thread 0022][akka://test/user/deliveryman/customer] 택배를 찾을수 없습니다.
[DEBUG][2020-03-15 오후 1:47:54][Thread 0025][akka://test/user/deliveryman] received:일반택배발송:2
[DEBUG][2020-03-15 오후 1:47:54][Thread 0025][akka://test/user/deliveryman] 택배발송되었습니다.:일반택배발송:2
[DEBUG][2020-03-15 오후 1:47:54][Thread 0022][akka://test/user/deliveryman/customer] 네, 받았습니다.
[DEBUG][2020-03-15 오후 1:47:54][Thread 0025][akka://test/user/deliveryman] received confirm:1
[DEBUG][2020-03-15 오후 1:47:54][Thread 0024][akka://test/user/deliveryman] received confirm:3
[WARNING][2020-03-15 오후 1:48:01][Thread 0022][akka://test/user/deliveryman/customer] 택배를 찾을수 없습니다.
[DEBUG][2020-03-15 오후 1:48:06][Thread 0025][akka://test/user/deliveryman/customer] 네, 받았습니다.
[DEBUG][2020-03-15 오후 1:48:06][Thread 0022][akka://test/user/deliveryman] received confirm:2



Kafka의 기능에도 AtleastOnceDeivery 기능이 포함되어있으며 소비자가 수신확인(커밋)을 하여  실패에대한 재전송을 방지하고

정확하게 한번 전송할수 있는 기능에 근접할수 있습니다, Kafka만 사용을 했다면, 이러한 기능이 Kafka에 나온것처럼 보이지만

이론은 Erlang에서 먼저 언급되고 성공을 거둔 기법입니다.

각각 플랫폼이 달라도 메시지 전송을 보증하기위한 컨셉은 크게 다르지 않습니다. ( Erlang의 영향을 모두 받음)



  • No labels