Page History
...
| Code Block | ||||
|---|---|---|---|---|
| ||||
ActorSystem actorSystem; ActorMaterializer materializer; KafkaOptions kfka_options; BrokerRouter kfka_router; private void kafkasendStream() { kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092")); kfka_router = new BrokerRouter(kfka_options); var client = new Producer(kfka_router); actorSystem = ActorSystem.Create("ServiceB"); var materializer = actorSystem.Materializer(); Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10)); using ( var materializer = actorSystem.Materializer() ) { var factorials = source.Scan(1, ( acc, next ) => acc + next); factorials factorials .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping) .RunForeach(i => { Console.WriteLine("SendOne"); client.SendMessageAsync("test", new[] { new Message( i.ToString() ) }); } , materializer).Wait(); //실제 서비스코드에서는 Wait를 걸필요는 없습니다. } } |
At-Least-Once Delivery
At-Least-Once Delivery 를 활용하여 Kafka에게 메시지를 꼭 한번만 전송하는
...