Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

-사용자 측에서는 이것이 재전송이 되던지 말던지 신경을 쓰지 않으나, 액터설계는 실패시 재전송을 합니다.


액터 설계부분

여기서 언급되지 않는 데이터 구조체는, 위에서 언급한 At-Least-Once Delivery에서 가져옵니다.

Code Block
languagec#
themeEmacs
title카프카 전송모듈
public class KafkaSendActor : ReceiveActor
{
    private ILoggingAdapter log = Context.GetLogger();
    protected int messageCnt = 0;
    private KafkaOptions kfka_options;
    private BrokerRouter kfka_router;
    private Producer client;
    private string topic = "test";

    public KafkaSendActor()
    {
        kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
        kfka_router = new BrokerRouter(kfka_options);
        client = new Producer(kfka_router);
        IActorRef parentActor = null;
        
        Receive<Msg>(msg =>
        {
            parentActor = Sender;
            messageCnt++;
            //Test를 위해 메시지 중간에 드랍...
            if ( messageCnt % 2 == 0 )
            {
                log.Warning("Test for Drop Message");
                return;
            }
            log.Debug("Yes I can");
            HandleProducer(msg);                
        });

        Receive<Confirm>(msg =>
        {
            if( msg!=null)
                parentActor.Tell(msg, Self);
        });

    }
    public void HandleProducer( Msg msg)
    {
        Task.Run(async () =>
        {
            List<ProduceResponse> reponse = await client.SendMessageAsync( topic , new[] { new Message(msg.Message) });
            ProduceResponse resOne = reponse[0];
            Confirm confirm = new Confirm(msg.DeliveryId);
            if ( resOne.Error == 0 )
                return confirm;
            else
                return null;                
        }).PipeTo(Self);
    }        
}

...