You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

AkkaStream의 조절기를 통해 유입량 조절을 하는것을 앞장에서 살펴보았습니다.

이 Stream장치에 Kafka의 입출력(생산,소비)을 연결해보겠습니다. 스트림은 그에준하는 스트림 시스템에 연결하는것이 효과적입니다.

카프카와 아카스트림 연결하기

셋업방법및 사용방법을 유닛테스트를 통해 구현하였으며

유닛테스트가 스스로 코드를 설명하기때문에, 자세한 설명은 생략하겠습니다.

// Kafka를 Reactive Stream 버전으로 사용하기
// https://github.com/akkadotnet/Akka.Streams.Kafka
// https://github.com/akka/alpakka

namespace AkkaNetCoreTest.Adapters
{
    public class AlpakkaTest : TestKitXunit
    {
        protected TestProbe probe;

        protected ProducerSettings<Null,string> producerSettings;

        protected ConsumerSettings<Null, string> consumerSettings;

        protected string testTopic;

        protected IAutoSubscription subscription;

        protected ActorMaterializer materializer_producer;
        protected ActorMaterializer materializer_consumer;

        public AlpakkaTest(ITestOutputHelper output) : base(output)
        {
            Setup();
        }

        protected void Setup()
        {
            testTopic = "akka100";

            subscription = Subscriptions.Topics(testTopic);

            probe = this.CreateTestProbe();

            var config = HoconConfigurationFactory.FromFile("akka.kafka.conf");
            //akka.loglevel = DEBUG
            //akka.suppress - json - serializer - warning = true

            var system_producer = ActorSystem.Create("TestKafka", config);
            materializer_producer = system_producer.Materializer();

            var system_consumer = ActorSystem.Create("TestKafka", config);
            materializer_consumer = system_producer.Materializer();

            this.Sys.Settings.Config.WithFallback(config);

            producerSettings = ProducerSettings<Null, string>.Create(system_producer, null, null)
                .WithBootstrapServers("kafka:9092");                

            consumerSettings = ConsumerSettings<Null, string>.Create(system_consumer, null, null)
                .WithBootstrapServers("kafka:9092")
                .WithGroupId("group1");

        }

        [Theory]
        [InlineData(20,10)] //20 개의 메시지를 생산하고,소비한다,테스트는 10초이내에 완료되어야함(완료시 종료됨)
        public void ProduceAndConsumeAreOK(int testCnt, int cutoff)
        {            
            int readyTimeForConsume = 3;                        

            int recCnt = 0;
            KafkaConsumer.PlainSource(consumerSettings, subscription)
            .RunForeach(result =>
            {
                Console.WriteLine($"Consumer: {result.Topic}/{result.Partition} {result.Offset}: {result.Value}");
                recCnt++;
                if (recCnt == testCnt)
                    probe.Tell("카프카수신OK");
            }, materializer_consumer);

            Task.Delay(TimeSpan.FromSeconds(readyTimeForConsume)).Wait();

            Source<int, NotUsed> source = Source.From(Enumerable.Range(1, testCnt));

            source                
                .Select(c => c.ToString())
                .Select(elem => ProducerMessage.Single(new ProducerRecord<Null, string>(testTopic, elem)))
                .Via(KafkaProducer.FlexiFlow<Null, string, NotUsed>(producerSettings))
                .Select(result =>
                {
                    var response = result as Result<Null, string, NotUsed>;
                    Console.WriteLine($"Producer: {response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}: {response.Metadata.Value}");
                    return result;
                })
                .RunWith(Sink.Ignore<IResults<Null, string, NotUsed>>(), materializer_producer);
            
            Within(TimeSpan.FromSeconds(cutoff), () =>
            {
                probe.ExpectMsg("카프카수신OK",TimeSpan.FromSeconds(cutoff));
            });
        }
    }
}


참고:

  • No labels