Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

akka.net을 통해 kafka를 연동하는 몇가지 유용한

샘플을 작성해보겠습니다.


Table of Contents

초당 전송횟수 조절

Akka Stream의 Throttle 를 사용하며, 입력(소스)처리의 제한없이 

출력(sink,kafka 전송)량을 지정할수가 있습니다.

Code Block
languagec#
themeEmacs
ActorSystem actorSystem;
ActorMaterializer materializer;
KafkaOptions kfka_options;
BrokerRouter kfka_router;

kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
var client = new Producer(kfka_router);

actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();

Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
var factorials = source.Scan(1, ( acc, next ) => acc + next);

factorials
     .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
     .RunForeach(i => {
         Console.WriteLine("SendOne");
         client.SendMessageAsync("test", new[] { new Message(i.ToString()) });
     }, materializer);

At-Least-Once Delivery

At-Least-Once Delivery 를 활용하여  Kafka에게 메시지를 꼭 한번만 전송하는

...

우리가 원하는 몇가지 메시지 기능을 연결하면 됩니다.


초당 전송횟수 조절

Akka Stream의 Throttle 를 사용하며, 입력(소스)처리의 제한없이 

출력(sink,kafka 전송)량을 지정할수가 있습니다.


Code Block
languagec#
themeEmacs
ActorSystem actorSystem;
ActorMaterializer materializer;
KafkaOptions kfka_options;
BrokerRouter kfka_router;

kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
var client = new Producer(kfka_router);

actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();

Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
var factorials = source.Scan(1, ( acc, next ) => acc + next);

factorials
     .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
     .RunForeach(i => {
         Console.WriteLine("SendOne");
         client.SendMessageAsync("test", new[] { new Message(i.ToString()) });
     }, materializer);