Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

출력(sink,kafka 전송)량을 지정할수가 있습니다.

고전적인 방법에서는 타이머로 횟수를 제한하거나? ( 복잡도 증가 )

강제로 전송시 Sleep을 주었을 것입니다. ( 제어 주기조절이 어렵고, 블락발생)

AkkaStream은 처리 흐름제어를 조금더 유연하게 할수가 있습니다.


Code Block
languagec#
themeEmacs
ActorSystem actorSystem;
ActorMaterializer materializer;
KafkaOptions kfka_options;
BrokerRouter kfka_router;

kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
var client = new Producer(kfka_router);

actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();

Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
var factorials = source.Scan(1, ( acc, next ) => acc + next);

factorials
     .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
     .RunForeach(i => {
         Console.WriteLine("SendOne");
         client.SendMessageAsync("test", new[] { new Message(i.ToString()) });
     }, materializer);


과제

  • 위 두가지 기능을 합하기 ( 전송보장 + 흐름 속도제어)
  • 역압력설정:카프카의 입력처리량보다 성공처리량이 늦을시 흐름속력을 늦춤 ( 성공승인 메시지 시간은 카프카 처리 최대 타임아웃으로 지정)