You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

akka를 통해 kafka를 연동하는 몇가지 유용한

샘플을 작성해보겠습니다.


초당 전송횟수 조절

Akka Stream의 Thtottle 를 사용하며, 입력(소스)처리의 제한없이 

출력(sink,kafka 전송)량을 지정할수가 있습니다.


        ActorSystem actorSystem;
        ActorMaterializer materializer;
        KafkaOptions kfka_options;
        BrokerRouter kfka_router;


        private void kafkasendStream()
        {
            kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
            kfka_router = new BrokerRouter(kfka_options);
            var client = new Producer(kfka_router);

            actorSystem = ActorSystem.Create("ServiceB");
            Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
            using ( var materializer = actorSystem.Materializer() )
            {
                var factorials = source.Scan(1, ( acc, next ) => acc + next);
                factorials                     
                     .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
                     .RunForeach(i => {
                         Console.WriteLine("SendOne");
                         client.SendMessageAsync("test", new[] { new Message( i.ToString() ) });
                     } , materializer).Wait();  //실제 서비스코드에서는 Wait를 걸필요는 없습니다.
            }
        }
  • No labels