You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

akka.net을 통해 kafka를 연동하는 몇가지 유용한

샘플을 작성해보겠습니다.


초당 전송횟수 조절

Akka Stream의 Throttle 를 사용하며, 입력(소스)처리의 제한없이 

출력(sink,kafka 전송)량을 지정할수가 있습니다.


ActorSystem actorSystem;
ActorMaterializer materializer;
KafkaOptions kfka_options;
BrokerRouter kfka_router;

kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
var client = new Producer(kfka_router);

actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();

Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
var factorials = source.Scan(1, ( acc, next ) => acc + next);

factorials
     .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
     .RunForeach(i => {
         Console.WriteLine("SendOne");
         client.SendMessageAsync("test", new[] { new Message(i.ToString()) });
     }, materializer);

At-Least-Once Delivery

At-Least-Once Delivery 를 활용하여  Kafka에게 메시지를 꼭 한번만 전송하는

모듈을 작성해보겠습니다.


TestCase :

  • Kafka에 메시지 20개를 보낸다.
  • 테스트를 위해 짝수번째 메시지는, 중간에 드롭을 시킨다.
  • 전송부쪽에서는, Kafka의 성공메시지를 못받았기때문에 홀수번째 메시지를 다시 보낸다.
  • 결국 20개의 메시지가 모두 성공한다.

사용부분

            actorSystem = ActorSystem.Create("ServiceB");
            var kafkaActor = actorSystem.ActorOf<KafkaAtLeastOnceDeliveryReceiveActor>("kafka");
            for(int i=0; i<20;i++ )
            {
                kafkaActor.Tell("can you speak english? :" + i);
            }

전송부분은 액터의 심플한 전송을 그대로 사용합니다.

-사용자 측에서는 이것이 재전송이 되던지 말던지 신경을 쓰지 않으나, 액터설계는 실패시 재전송을 합니다.


액터 설계부분

카프카 전송모듈
public class KafkaSendActor : ReceiveActor
{
    private ILoggingAdapter log = Context.GetLogger();
    protected int messageCnt = 0;
    private KafkaOptions kfka_options;
    private BrokerRouter kfka_router;
    private Producer client;
    private string topic = "test";

    public KafkaSendActor()
    {
        kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
        kfka_router = new BrokerRouter(kfka_options);
        client = new Producer(kfka_router);
        IActorRef parentActor = null;
        
        Receive<Msg>(msg =>
        {
            parentActor = Sender;
            messageCnt++;
            //Test를 위해 메시지 중간에 드랍...
            if ( messageCnt % 2 == 0 )
            {
                log.Warning("Test for Drop Message");
                return;
            }
            log.Debug("Yes I can");
            HandleProducer(msg);                
        });

        Receive<Confirm>(msg =>
        {
            if( msg!=null)
                parentActor.Tell(msg, Self);
        });

    }
    public void HandleProducer( Msg msg)
    {
        Task.Run(async () =>
        {
            List<ProduceResponse> reponse = await client.SendMessageAsync( topic , new[] { new Message(msg.Message) });
            ProduceResponse resOne = reponse[0];
            Confirm confirm = new Confirm(msg.DeliveryId);
            if ( resOne.Error == 0 )
                return confirm;
            else
                return null;                
        }).PipeTo(Self);
    }        
}

카프카의 비동기 결과처리를 Pipe를 통해 연결하여 비동기처리가 유지되돌

카프카 전송모듈을 작성하고, 카프카로부터 전송 성공메시지를 받았을시 부모액터에게

해당메시지는 잘 처리되었다는 승인 메시지를 보냅니다. 승인메시지를 보내지 못할시 

부모액터는 실패한메시지를 재전송을 시도하게될것입니다.



재전송을 책임지는 액터
public class KafkaAtLeastOnceDeliveryReceiveActor : AtLeastOnceDeliveryReceiveActor
{
    private readonly IActorRef _destionationActor = Context.ActorOf<KafkaSendActor>();
    private ILoggingAdapter log = Context.GetLogger();

    public KafkaAtLeastOnceDeliveryReceiveActor()
    {
        Recover<MsgSent>(msgSent => Handler(msgSent));
        Recover<MsgConfirmed>(msgConfirmed => Handler(msgConfirmed));

        Command<string>(str =>
        {
            log.Debug("received:" + str);
            Persist(new MsgSent(str), Handler);
        });

        Command<Confirm>(confirm =>
        {
            //메시지 받음을 확인하고, 해당 메시지를 더이상 안보낸다.
            log.Debug("received confirm:" + confirm.DeliveryId);
            Persist(new MsgConfirmed(confirm.DeliveryId), Handler);
        });
    }

    private void Handler( MsgSent msgSent )
    {
        Deliver(_destionationActor.Path, l => new Msg(l, msgSent.Message));
    }

    private void Handler( MsgConfirmed msgConfirmed )
    {
        ConfirmDelivery(msgConfirmed.DeliveryId);
    }

    public override string PersistenceId { get; } = "persistence-id";
}

재전송 모듈 작성은 간단합니다.   AtLeastOnceDeliveryReceiveActor 라는 준비된 액터가 있어서 이것을 상속받아

우리가 원하는 몇가지 메시지 기능을 연결하면 됩니다.










  • No labels