AkkaStream의 조절기를 통해 유입량 조절을 하는것을 앞장에서 살펴보았습니다.
이 Stream장치에 Kafka의 입출력(생산,소비)을 연결해보겠습니다. 스트림은 그에준하는 스트림 시스템에 연결하는것이 효과적입니다.
카프카와 아카스트림 연결하기
셋업방법및 사용방법을 유닛테스트를 통해 구현하였으며
유닛테스트가 스스로 코드를 설명하기때문에, 자세한 설명은 생략하겠습니다.
// Kafka를 Reactive Stream 버전으로 사용하기 // https://github.com/akkadotnet/Akka.Streams.Kafka // https://github.com/akka/alpakka namespace AkkaNetCoreTest.Adapters { public class AlpakkaTest : TestKitXunit { protected TestProbe probe; protected ProducerSettings<Null,string> producerSettings; protected ConsumerSettings<Null, string> consumerSettings; protected string testTopic; protected IAutoSubscription subscription; protected ActorMaterializer materializer_producer; protected ActorMaterializer materializer_consumer; public AlpakkaTest(ITestOutputHelper output) : base(output) { Setup(); } protected void Setup() { testTopic = "akka100"; subscription = Subscriptions.Topics(testTopic); probe = this.CreateTestProbe(); var config = HoconConfigurationFactory.FromFile("akka.kafka.conf"); //akka.loglevel = DEBUG //akka.suppress - json - serializer - warning = true var system_producer = ActorSystem.Create("TestKafka", config); materializer_producer = system_producer.Materializer(); var system_consumer = ActorSystem.Create("TestKafka", config); materializer_consumer = system_producer.Materializer(); this.Sys.Settings.Config.WithFallback(config); producerSettings = ProducerSettings<Null, string>.Create(system_producer, null, null) .WithBootstrapServers("kafka:9092"); consumerSettings = ConsumerSettings<Null, string>.Create(system_consumer, null, null) .WithBootstrapServers("kafka:9092") .WithGroupId("group1"); } [Theory] [InlineData(20,10)] //20 개의 메시지를 생산하고,소비한다,테스트는 10초이내에 완료되어야함(완료시 종료됨) public void ProduceAndConsumeAreOK(int testCnt, int cutoff) { int readyTimeForConsume = 3; int recCnt = 0; KafkaConsumer.PlainSource(consumerSettings, subscription) .RunForeach(result => { Console.WriteLine($"Consumer: {result.Topic}/{result.Partition} {result.Offset}: {result.Value}"); recCnt++; if (recCnt == testCnt) probe.Tell("카프카수신OK"); }, materializer_consumer); Task.Delay(TimeSpan.FromSeconds(readyTimeForConsume)).Wait(); Source<int, NotUsed> source = Source.From(Enumerable.Range(1, testCnt)); source .Select(c => c.ToString()) .Select(elem => ProducerMessage.Single(new ProducerRecord<Null, string>(testTopic, elem))) .Via(KafkaProducer.FlexiFlow<Null, string, NotUsed>(producerSettings)) .Select(result => { var response = result as Result<Null, string, NotUsed>; Console.WriteLine($"Producer: {response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}: {response.Metadata.Value}"); return result; }) .RunWith(Sink.Ignore<IResults<Null, string, NotUsed>>(), materializer_producer); Within(TimeSpan.FromSeconds(cutoff), () => { probe.ExpectMsg("카프카수신OK",TimeSpan.FromSeconds(cutoff)); }); } } }
참고: