Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
themeEmacs
// Kafka를 Reactive Stream 버전으로 사용하기
// https://github.com/akkadotnet/Akka.Streams.Kafka
// https://github.com/akka/alpakka

namespace AkkaNetCoreTest.Adapters
{
    // 테스트 목적 : Kafka와 같은 메시지 큐시스템은,Akka의 Stream과 연결하여 Reactive Stream을 준수할수 있습니다.
    public class AlpakkaTest : TestKitXunit
    {
        protected TestProbe probe;

        protected ProducerSettings<Null,string> producerSettings;

        protected ConsumerSettings<Null, string> consumerSettings;

        protected string testTopic;

        protected IAutoSubscription subscription;

        protected ActorMaterializer materializer_producer;
        protected ActorMaterializer materializer_consumer;

        public AlpakkaTest(ITestOutputHelper output) : base(output)
        {
            Setup();
        }

        protected void Setup()
        {
            testTopic = "akka100";

            subscription = Subscriptions.Topics(testTopic);

            probe = this.CreateTestProbe();

            var config = HoconConfigurationFactory.FromFile("akka.test.conf");
            
            var system_producer = ActorSystem.Create("TestKafka", config);
            materializer_producer = system_producer.Materializer();

            var system_consumer = ActorSystem.Create("TestKafka", config);
            materializer_consumer = system_producer.Materializer();

            this.Sys.Settings.Config.WithFallback(config);

            producerSettings = ProducerSettings<Null, string>.Create(system_producer, null, null)
                .WithBootstrapServers("kafka:9092");

            consumerSettings = ConsumerSettings<Null, string>.Create(system_consumer, null, null)
                .WithBootstrapServers("kafka:9092")
                .WithGroupId("group1");                

        }

        [Theory]
        [InlineData(20,10)] //20 개의 메시지를 생산하고,소비한다,테스트는 10초이내에 완료되어야함(완료시 종료됨)
        public void 카프카_생산과소비는_일치해야한다(int limit, int cutoff)
        {
            string lastSignal = Guid.NewGuid().ToString();
            int readyTimeForConsume = 3;
            int recCnt = 0;

            KafkaConsumer.CommittableSource(consumerSettings, subscription)
            .RunForeach(result =>
            {
                Console.WriteLine($"Consumer: {result.Record.Topic}/{result.Record.Partition} {result.Record.Offset}: {result.Record.Value}");                
                if (lastSignal == result.Record.Value)
                    probe.Tell("처리모두완료");

                result.CommitableOffset.Commit();

            }, materializer_consumer);
            
            Source<int, NotUsed> source = Source.From(Enumerable.Range(1, limit));
            
            source
                .Throttle(2, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)      //출력 조절 : 초당 2개처리-출력조절은 옵션입니다.(Akka Stream)
                .Select(c => 
                {
                    var result = $"No:{c.ToString()}";
                    if(c == limit)
                    {
                        result = lastSignal;
                    }                    
                    return result;
                })                
                .Select(elem => ProducerMessage.Single(new ProducerRecord<Null, string>(testTopic, elem)))
                .Via(KafkaProducer.FlexiFlow<Null, string, NotUsed>(producerSettings))
                .Select(result =>
                {
                    var response = result as Result<Null, string, NotUsed>;
                    Console.WriteLine($"Producer: {response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}: {response.Metadata.Value}");
                    return result;
                })
                .RunWith(Sink.Ignore<IResults<Null, string, NotUsed>>(), materializer_producer);

            Within(TimeSpan.FromSeconds(cutoff), () =>
            {
                probe.ExpectMsg("처리모두완료", TimeSpan.FromSeconds(cutoff));
            });
        }
    }
}

...