You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

AkkaStream의 조절기를 통해 유입량 조절을 하는것을 앞장에서 살펴보았습니다.

이 Stream장치에 Kafka의 입출력(생산,소비)을 연결이 가능하며 Reactive Stream을 준수합니다.

스트림 처리는 블락이 없어야하며, 유연하게 흘러야 하기때문에 그에 준하는 스트림 시스템에 연결하는것이 효과적입니다.

카프카와 아카스트림 연결하기

셋업방법및 사용방법을 유닛테스트를 통해 구현하였으며

유닛테스트가 스스로 코드를 설명하기때문에, 자세한 설명은 생략하겠습니다.

// Kafka를 Reactive Stream 버전으로 사용하기
// https://github.com/akkadotnet/Akka.Streams.Kafka
// https://github.com/akka/alpakka

namespace AkkaNetCoreTest.Adapters
{
    public class AlpakkaTest : TestKitXunit
    {
        protected TestProbe probe;

        protected ProducerSettings<Null,string> producerSettings;

        protected ConsumerSettings<Null, string> consumerSettings;

        protected string testTopic;

        protected IAutoSubscription subscription;

        protected ActorMaterializer materializer_producer;
        protected ActorMaterializer materializer_consumer;

        public AlpakkaTest(ITestOutputHelper output) : base(output)
        {
            Setup();
        }

        protected void Setup()
        {
            testTopic = "akka100";

            subscription = Subscriptions.Topics(testTopic);

            probe = this.CreateTestProbe();

            var config = HoconConfigurationFactory.FromFile("akka.kafka.conf");
            //akka.loglevel = DEBUG
            //akka.suppress - json - serializer - warning = true

            var system_producer = ActorSystem.Create("TestKafka", config);
            materializer_producer = system_producer.Materializer();

            var system_consumer = ActorSystem.Create("TestKafka", config);
            materializer_consumer = system_producer.Materializer();

            this.Sys.Settings.Config.WithFallback(config);

            producerSettings = ProducerSettings<Null, string>.Create(system_producer, null, null)
                .WithBootstrapServers("kafka:9092");                

            consumerSettings = ConsumerSettings<Null, string>.Create(system_consumer, null, null)
                .WithBootstrapServers("kafka:9092")
                .WithGroupId("group1");

        }

        [Theory]
        [InlineData(20,10)] //20 개의 메시지를 생산하고,소비한다,테스트는 10초이내에 완료되어야함(완료시 종료됨)
        public void ProduceAndConsumeAreOK(int testCnt, int cutoff)
        {            
            int readyTimeForConsume = 3;                        

            int recCnt = 0;
            KafkaConsumer.PlainSource(consumerSettings, subscription)
            .RunForeach(result =>
            {
                Console.WriteLine($"Consumer: {result.Topic}/{result.Partition} {result.Offset}: {result.Value}");
                recCnt++;
                if (recCnt == testCnt)
                    probe.Tell("카프카수신OK");
            }, materializer_consumer);

            Task.Delay(TimeSpan.FromSeconds(readyTimeForConsume)).Wait();

            Source<int, NotUsed> source = Source.From(Enumerable.Range(1, testCnt));

            source                
                .Select(c => c.ToString())
                .Select(elem => ProducerMessage.Single(new ProducerRecord<Null, string>(testTopic, elem)))
                .Via(KafkaProducer.FlexiFlow<Null, string, NotUsed>(producerSettings))
                .Select(result =>
                {
                    var response = result as Result<Null, string, NotUsed>;
                    Console.WriteLine($"Producer: {response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}: {response.Metadata.Value}");
                    return result;
                })
                .RunWith(Sink.Ignore<IResults<Null, string, NotUsed>>(), materializer_producer);
            
            Within(TimeSpan.FromSeconds(cutoff), () =>
            {
                probe.ExpectMsg("카프카수신OK",TimeSpan.FromSeconds(cutoff));
            });
        }
    }
}


참고:

  • No labels