닷넷코어에서 카프카를 사용하는방법과 로컬 테스트 환경을 수행하는 방법에대해 알아보겠습니다.

추가적으로 로컬 유닛테스트 환경을 위해 비동기 메시지를 관찰자액터를 통해 검증하는 방법까지 살펴보겠습니다.

src : https://github.com/psmon/CQRSAkka


사전준비환경 : 로컬 도커환경 ( 맥,윈도우 개발환경 호환)

도커 네트워크 생성

docker network create --driver=bridge --subnet=172.19.0.0/16 devnet

docker network inspect devnet

도커가 다중화되어있으면 overlay등도 할수 있으며, 여기서는 브릿지네트워크를 이용하여

각각의 컨테이너들의 내부 ip를 제어하겠습니다. 

도커컴포즈-카프카

docker-compose.yml
version: '3.5'
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
    - '2181:2181'
    networks:
      devnet:
        ipv4_address: 172.19.0.20
    environment:
    - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    hostname: kafka
    image: 'bitnami/kafka:latest'
    ports:
    - '9092:9092'
    networks:
      devnet:
        ipv4_address: 172.19.0.21
    environment:
    - KAFKA_ADVERTISED_HOST_NAME=kafka
    - KAFKA_ZOOKEEPER_CONNECT=172.19.0.20:2181
    - ALLOW_PLAINTEXT_LISTENER=yes

networks:
  devnet:
    external:
      name: devnet

도커 환경이 구축이되었다고하면, 위 파일의 경로에서

docker-compose up  을 통해 카프카 수행이가능합니다.


카프카 생산자

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace DDDSample.Adapters.kafka
{
    public class KafkaProduce
    {
        private readonly Dictionary<string, object> config;
        private readonly Producer<Null, string> producer;
        private readonly String topic;

        public KafkaProduce(string server,string _topic)
        {
            config = new Dictionary<string, object>
            {
                { "bootstrap.servers", server },
                { "group.id","kafka_consumer" }
            };

            topic = _topic;


            producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8));
        }

        public void Produce(string data)
        {
            producer.ProduceAsync(topic, null, data).Wait();
            producer.Flush(100);
        }

        public async Task ProduceAsync(string data)
        {
            await producer.ProduceAsync(topic, null, data);
        }

        public void Flush(int milisecondTimeOut)
        {
            producer.Flush(milisecondTimeOut);
        }

    }
}

생산자에서 고민해야할것은 동기적으로 메시지를 보낼것인가? 비동기적으로 보낼것인가? 에 대한 고민을 해야합니다.

동기적 전송과 비동기는 다음과 같은 큰 차이가 있습니다.

  • 동기적 : 발송완료를 확인하지만 성능저하가 있을수 있으나, 에러핸들링이 쉬워짐
  • 비동기적 : 큐에 어느정도 쌓아놓고 주기적인 처리가가능 , 에러핸들링이 복잡해질수 있음


카프카 소비자

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace DDDSample.Adapters.kafka
{
    public class KafkaConsumer
    {
        private string server;
        private string topic;

        private CancellationToken ct;
        CancellationTokenSource tokenSource2;

        public Boolean HasMessage { get; set; }

        public KafkaConsumer(string _server,string _topic)
        {
            server = _server;
            topic = _topic;
            tokenSource2 = new CancellationTokenSource();
            ct = tokenSource2.Token;
        }

        public void Stop()
        {
            tokenSource2.Cancel();
        }

        public Task CreateConsumer(IActorRef consumeAoctor)
        {            
            var config = new Dictionary<string, object>
                {
                    {"group.id","kafka_consumer" },
                    {"bootstrap.servers", server },
                    { "enable.auto.commit", "false" }
                };

            Console.WriteLine("kafka StartConsumer ");

            var task = new Task(() => {

                // Were we already canceled?
                ct.ThrowIfCancellationRequested();

                using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
                {
                    consumer.Subscribe(topic);
                    consumer.OnMessage += (_, msg) => {
                        //message(msg.Value);                        
                        Console.WriteLine(string.Format("kafka msg {0} === {1}",msg.Offset.Value,msg.Value));
                        if (consumeAoctor != null) consumeAoctor.Tell(new KafkaMessage(msg.Topic, msg.Value));
                        HasMessage = true;
                    };
                    
                    while (true)
                    {
                        if (ct.IsCancellationRequested)
                        {
                            // Clean up here, then...
                            ct.ThrowIfCancellationRequested();
                        }
                        consumer.Poll(100);
                        //consumer.CommitAsync();
                    }
                }
                
            }, tokenSource2.Token);
            
            return task;
        }
    }
}

소비를 처리하는것은 , 해당 메시지를 받고 그 메시지에대한 처리를 해야하기때문에 어떻게 전달하고 누가 처리할것인가라?

기본적인 이벤트처리를 전달하는 방법에대해 고민을 해야합니다. 소비를 처리하는 액터에게 단순하게 전달하였습니다.

토픽에따라 처리액터를 구분할수도 있고, 더 나아가 메시지 타잎에 따라 분기도 가능하지만 중요한것은 액터모델은 카프카의 메시지에 대한

필터처리를 유연하게할수 있는 장점이 있으며 AkkaStream과 연동시 스트림 Flow처리를 유연하게 할수 있습니다.


아래 유닛테스트 샘플은 , 생산→소비 의 카프카 라이프 사이클을 간단하게 확인할수 있는 유닛테스트이며

복잡한 원격 비동기 메시지 처리를 직관적이고 단순화 시킬수 있는지 알수 있습니다.

카프카 유닛 테스트

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.NUnit3;
using DDDSample.Adapters.kafka;
using NUnit.Framework;

namespace DDDSampleTest.Kafka
{
    public class KafkaConsumerTest : TestKit
    {
        KafkaProduce kafkaProduce;
        KafkaConsumer kafkaConsumer;
        TestProbe probe;

        [SetUp]
        public void Setup()
        {
            kafkaConsumer = new KafkaConsumer("kafka:9092", "test_consumer");
            probe = this.CreateTestProbe();
            kafkaConsumer.CreateConsumer(probe).Start();
            
            kafkaProduce = new KafkaProduce("kafka:9092", "test_consumer");           
        }

        [Test]
        public void ProduceAndConsumerTest()
        {            
            kafkaProduce.Produce("SomeMessage");
            
            Within(TimeSpan.FromSeconds(3), () => {
                
                AwaitCondition(() => probe.HasMessages);
                
                probe.ExpectMsg<KafkaMessage>(TimeSpan.FromSeconds(0));

                KafkaMessage lastMessage = probe.LastMessage as KafkaMessage;

                Assert.AreEqual("SomeMessage", lastMessage.message);

            });
        }
    }
}

유닛테스트에대한 이야기를 잠깐하면  유닛테스트만으로 모듈사용에 대한 가이드를 제공하고, 개선 검증활동으로 이어져야한다는것입니다.

어려운 목표이지만, 로직에대한 흐름의 설명은 주석이 아닌 코드자체로 되어야 한다란것이며

위 코드만으로 카프카에 대한 사용가 이해가 된다고하면, 위 유닛테스트 작성은 성공적이나 그렇지 않고 무엇을 의미하고 테스트를 하는것인가?

이해가 되지 않는다고 하면 지속적으로 개선되어야할 필요가 있습니다.


마무리

메시지 전송보장을 위해 조금더 개선해야할 부분이 남아있으며

"정확하게 한번 전송"에는 다양한 IT의 의미와 인프라적인 요소가 담겨져 있습니다.   -참고 : MessageDeliveryReliability

위 코드는, 다양한 도메인의 요구사항에 따라 더 개선점이 있을수 있으며 , 도메인의 이해와 함께 카프카의 옵션을 같이 이해하고 직접 설계에 반영을 해야합니다.



  • No labels