Page History
...
출력(sink,kafka 전송)량을 지정할수가 있습니다.
고전적인 방법에서는 타이머로 횟수를 제한하거나? ( 복잡도 증가 )
강제로 전송시 Sleep을 주었을 것입니다. ( 제어 주기조절이 어렵고, 블락발생)
AkkaStream은 처리 흐름제어를 조금더 유연하게 할수가 있습니다.
| Code Block | ||||
|---|---|---|---|---|
| ||||
ActorSystem actorSystem;
ActorMaterializer materializer;
KafkaOptions kfka_options;
BrokerRouter kfka_router;
kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
var client = new Producer(kfka_router);
actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
var factorials = source.Scan(1, ( acc, next ) => acc + next);
factorials
.Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
.RunForeach(i => {
Console.WriteLine("SendOne");
client.SendMessageAsync("test", new[] { new Message(i.ToString()) });
}, materializer); |
과제
- 위 두가지 기능을 합하기 ( 전송보장 + 흐름 속도제어)
- 역압력설정:카프카의 입력처리량보다 성공처리량이 늦을시 흐름속력을 늦춤 ( 성공승인 메시지 시간은 카프카 처리 최대 타임아웃으로 지정)