AkkaStream의 조절기를 통해 유입량 조절을 하는것을 앞장에서 살펴보았습니다. 이 Stream장치에 Kafka의 입출력(생산,소비)을 연결이 가능하며 Reactive Stream을 준수합니다. 스트림 처리는 블락이 없어야하며, 유연하게 흘러야 하기때문에 그에 준하는 스트림 시스템에 연결하는것이 효과적입니다. |
셋업방법및 사용방법을 유닛테스트를 통해 구현하였으며
유닛테스트가 스스로 코드를 설명하기때문에, 자세한 설명은 생략하겠습니다.
// Kafka를 Reactive Stream 버전으로 사용하기 // https://github.com/akkadotnet/Akka.Streams.Kafka // https://github.com/akka/alpakka namespace AkkaNetCoreTest.Adapters { // 테스트 목적 : Kafka와 같은 메시지 큐시스템은,Akka의 Stream과 연결하여 Reactive Stream을 준수할수 있습니다. public class AlpakkaTest : TestKitXunit { protected TestProbe probe; protected ProducerSettings<Null,string> producerSettings; protected ConsumerSettings<Null, string> consumerSettings; protected string testTopic; protected IAutoSubscription subscription; protected ActorMaterializer materializer_producer; protected ActorMaterializer materializer_consumer; public AlpakkaTest(ITestOutputHelper output) : base(output) { Setup(); } protected void Setup() { testTopic = "akka100"; subscription = Subscriptions.Topics(testTopic); probe = this.CreateTestProbe(); var config = HoconConfigurationFactory.FromFile("akka.test.conf"); var system_producer = ActorSystem.Create("TestKafka", config); materializer_producer = system_producer.Materializer(); var system_consumer = ActorSystem.Create("TestKafka", config); materializer_consumer = system_producer.Materializer(); this.Sys.Settings.Config.WithFallback(config); producerSettings = ProducerSettings<Null, string>.Create(system_producer, null, null) .WithBootstrapServers("kafka:9092"); consumerSettings = ConsumerSettings<Null, string>.Create(system_consumer, null, null) .WithBootstrapServers("kafka:9092") .WithGroupId("group1"); } [Theory] [InlineData(20,10)] //20 개의 메시지를 생산하고,소비한다,테스트는 10초이내에 완료되어야함(완료시 종료됨) public void 카프카_생산과소비는_일치해야한다(int limit, int cutoff) { string lastSignal = Guid.NewGuid().ToString(); int readyTimeForConsume = 3; int recCnt = 0; KafkaConsumer.CommittableSource(consumerSettings, subscription) .RunForeach(result => { Console.WriteLine($"Consumer: {result.Record.Topic}/{result.Record.Partition} {result.Record.Offset}: {result.Record.Value}"); if (lastSignal == result.Record.Value) probe.Tell("처리모두완료"); result.CommitableOffset.Commit(); }, materializer_consumer); Source<int, NotUsed> source = Source.From(Enumerable.Range(1, limit)); source .Throttle(2, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping) //출력 조절:초당 2개처리-출력조절은 옵션입니다.(Akka Stream) .Select(c => { var result = $"No:{c.ToString()}"; if(c == limit) { result = lastSignal; } return result; }) .Select(elem => ProducerMessage.Single(new ProducerRecord<Null, string>(testTopic, elem))) .Via(KafkaProducer.FlexiFlow<Null, string, NotUsed>(producerSettings)) .Select(result => { var response = result as Result<Null, string, NotUsed>; Console.WriteLine($"Producer: {response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}: {response.Metadata.Value}"); return result; }) .RunWith(Sink.Ignore<IResults<Null, string, NotUsed>>(), materializer_producer); Within(TimeSpan.FromSeconds(cutoff), () => { probe.ExpectMsg("처리모두완료", TimeSpan.FromSeconds(cutoff)); }); } } } |
참고: