AkkaStream의 조절기를 통해 유입량 조절을 하는것을 앞장에서 살펴보았습니다.

이 Stream장치에 Kafka의 입출력(생산,소비)을 연결이 가능하며 Reactive Stream을 준수합니다.

스트림 처리는 블락이 없어야하며, 유연하게 흘러야 하기때문에 그에 준하는 스트림 시스템에 연결하는것이 효과적입니다.

카프카와 아카스트림 연결하기

셋업방법및 사용방법을 유닛테스트를 통해 구현하였으며

유닛테스트가 스스로 코드를 설명하기때문에, 자세한 설명은 생략하겠습니다.

// Kafka를 Reactive Stream 버전으로 사용하기
// https://github.com/akkadotnet/Akka.Streams.Kafka
// https://github.com/akka/alpakka

namespace AkkaNetCoreTest.Adapters
{
    // 테스트 목적 : Kafka와 같은 메시지 큐시스템은,Akka의 Stream과 연결하여 Reactive Stream을 준수할수 있습니다.
    public class AlpakkaTest : TestKitXunit
    {
        protected TestProbe probe;

        protected ProducerSettings<Null,string> producerSettings;

        protected ConsumerSettings<Null, string> consumerSettings;

        protected string testTopic;

        protected IAutoSubscription subscription;

        protected ActorMaterializer materializer_producer;
        protected ActorMaterializer materializer_consumer;

        public AlpakkaTest(ITestOutputHelper output) : base(output)
        {
            Setup();
        }

        protected void Setup()
        {
            testTopic = "akka100";

            subscription = Subscriptions.Topics(testTopic);

            probe = this.CreateTestProbe();

            var config = HoconConfigurationFactory.FromFile("akka.test.conf");
            
            var system_producer = ActorSystem.Create("TestKafka", config);
            materializer_producer = system_producer.Materializer();

            var system_consumer = ActorSystem.Create("TestKafka", config);
            materializer_consumer = system_producer.Materializer();

            this.Sys.Settings.Config.WithFallback(config);

            producerSettings = ProducerSettings<Null, string>.Create(system_producer, null, null)
                .WithBootstrapServers("kafka:9092");

            consumerSettings = ConsumerSettings<Null, string>.Create(system_consumer, null, null)
                .WithBootstrapServers("kafka:9092")
                .WithGroupId("group1");                

        }

        [Theory]
        [InlineData(20,10)] //20 개의 메시지를 생산하고,소비한다,테스트는 10초이내에 완료되어야함(완료시 종료됨)
        public void 카프카_생산과소비는_일치해야한다(int limit, int cutoff)
        {
            string lastSignal = Guid.NewGuid().ToString();
            int readyTimeForConsume = 3;
            int recCnt = 0;

            KafkaConsumer.CommittableSource(consumerSettings, subscription)
            .RunForeach(result =>
            {
                Console.WriteLine($"Consumer: {result.Record.Topic}/{result.Record.Partition} {result.Record.Offset}: {result.Record.Value}");                
                if (lastSignal == result.Record.Value)
                    probe.Tell("처리모두완료");

                result.CommitableOffset.Commit();

            }, materializer_consumer);
            
            Source<int, NotUsed> source = Source.From(Enumerable.Range(1, limit));
            
            source
                .Throttle(2, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)      //출력 조절:초당 2개처리-출력조절은 옵션입니다.(Akka Stream)
                .Select(c => 
                {
                    var result = $"No:{c.ToString()}";
                    if(c == limit)
                    {
                        result = lastSignal;
                    }                    
                    return result;
                })                
                .Select(elem => ProducerMessage.Single(new ProducerRecord<Null, string>(testTopic, elem)))
                .Via(KafkaProducer.FlexiFlow<Null, string, NotUsed>(producerSettings))
                .Select(result =>
                {
                    var response = result as Result<Null, string, NotUsed>;
                    Console.WriteLine($"Producer: {response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}: {response.Metadata.Value}");
                    return result;
                })
                .RunWith(Sink.Ignore<IResults<Null, string, NotUsed>>(), materializer_producer);

            Within(TimeSpan.FromSeconds(cutoff), () =>
            {
                probe.ExpectMsg("처리모두완료", TimeSpan.FromSeconds(cutoff));
            });
        }
    }
}


참고:


  • No labels