Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

akka.net을 통해 kafka를 연동하는 몇가지 유용한

샘플을 작성해보겠습니다.


Table of Contents

초당 전송횟수 조절

Akka Stream의 Throttle 를 사용하며, 입력(소스)처리의 제한없이 

...

  • Kafka에 메시지 20개를 보낸다.
  • 테스트를 위해 짝수번째 메시지는, 중간에 드롭을 시킨다.
  • 전송부쪽에서는, Kafka의 성공메시지를 못받았기때문에 홀수번째 메시지를 다시 보낸다.
  • 결국 20개의 메시지가 모두 성공한다.

사용부분

Code Block
languagec#
themeEmacs
            actorSystem = ActorSystem.Create("ServiceB");
            var kafkaActor = actorSystem.ActorOf<KafkaAtLeastOnceDeliveryReceiveActor>("kafka");
            for(int i=0; i<20;i++ )
            {
                kafkaActor.Tell("can you speak english? :" + i);
            }

...

-사용자 측에서는 이것이 재전송이 되던지 말던지 신경을 쓰지 않으나, 액터설계는 실패시 재전송을 합니다.


액터 설계부분

Code Block
languagec#
themeEmacs
title카프카 전송모듈
public class KafkaSendActor : ReceiveActor
{
    private ILoggingAdapter log = Context.GetLogger();
    protected int messageCnt = 0;
    private KafkaOptions kfka_options;
    private BrokerRouter kfka_router;
    private Producer client;
    private string topic = "test";

    public KafkaSendActor()
    {
        kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
        kfka_router = new BrokerRouter(kfka_options);
        client = new Producer(kfka_router);
        IActorRef parentActor = null;
        
        Receive<Msg>(msg =>
        {
            parentActor = Sender;
            messageCnt++;
            //Test를 위해 메시지 중간에 드랍...
            if ( messageCnt % 2 == 0 )
            {
                log.Warning("Test for Drop Message");
                return;
            }
            log.Debug("Yes I can");
            HandleProducer(msg);                
        });

        Receive<Confirm>(msg =>
        {
            if( msg!=null)
                parentActor.Tell(msg, Self);
        });

    }
    public void HandleProducer( Msg msg)
    {
        Task.Run(async () =>
        {
            List<ProduceResponse> reponse = await client.SendMessageAsync( topic , new[] { new Message(msg.Message) });
            ProduceResponse resOne = reponse[0];
            Confirm confirm = new Confirm(msg.DeliveryId);
            if ( resOne.Error == 0 )
                return confirm;
            else
                return null;                
        }).PipeTo(Self);
    }        
}

...