닷넷코어 콘솔에서 Akka를 통해 Kafka를 심플하게 사용할수 있는

AkkaDotModule를 부분이용한 샘플 HeadLessService 를 구성해보았습니다.


여기서 설명된 구현코드는 다음 Git저장소에서 확인및 실행가능합니다.

git : https://github.com/psmon/AkkaNetWithKafka


의존요소

콘솔APP 의존요소

netcoreapp3.1 에서 Test되었으며, 콘솔프로그램에서 다음 의존요소를 사용합니다. 

    <PackageReference Include="AkkaDotModule.Webnori" Version="1.1.2" />
    <PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.14" />
    <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.14" />

AkkaDotModule.Webnori 의존요소

AkkaDotModule은 AKKA+KAFKA 의 안정적인 모듈조합을 테스트하고 있으며

다음버전으로 동작 확인 작동검증 되었습니다.

    <PackageReference Include="Akka.Streams" Version="1.4.12" />
    <PackageReference Include="Akka.Streams.Kafka" Version="1.1.0" />
    <PackageReference Include="Confluent.Kafka" Version="1.5.2" />


준비 로컬 인프라

Docker를 통한 Kafka구동

bitnami의 StandAlone Kafka 를 이용하였으며

docker-compose up -d 를 통해 구동가능합니다.

version: '3.4'
 
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
    - '2181:2181'
    environment:
    - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    hostname: kafka
    image: 'bitnami/kafka:latest'
    ports:
    - '9092:9092'
    environment:
    - KAFKA_ADVERTISED_HOST_NAME=kafka
    - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
    - ALLOW_PLAINTEXT_LISTENER=yes

        


Topic생성

Kafka의 연결을 확인하고, 테스트에 사용할 topic을 생성합니다. ( kafka tool을 이용하였습니다.)

topic명 : akka100

Topic확인

이 샘플 프로젝트를, 성공적으로 1회 수행하면 10개의 메시지가 Kafka-Topic에 쌓이게 됩니다.


작동컨셉

Pub/Sub 컨셉을 알고 있으면, Kafka의 생산과 소비를 심플한 코드로 활용할수 있게

구성하였습니다.


구현

StandAlone HeadlessService

using AkkaNetWithKafka.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;

namespace AkkaNetWithKafka
{
    internal class Program
    {
        private static async Task Main(string[] args)
        {
            var host = new HostBuilder()
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddLogging();

                // register our host service
                services.AddHostedService<KafkaService>();

                })
                .ConfigureLogging((hostContext, configLogging) =>
                {                    
                    configLogging.AddConsole();

                })
                .UseConsoleLifetime()
                .Build();

            await host.RunAsync();
        }
    }
}

Console Base,IHostedService를 활용하여 KafkaService를 백그라운드로 작동 하였습니다.

Kafka의 소비기를 작동하기위해서는 항상작동하는 백그라운드 서비스가 필수입니다. 

참고링크 : https://docs.microsoft.com/ko-kr/dotnet/architecture/microservices/multi-container-microservice-net-applications/background-tasks-with-ihostedservice

KafkaService

생산기와 소비기를 동시에 생성하고, 샘플로 메시지 생산을 하였습니다.

이 샘플코드를 참고하여, 서비스에 맞는 Kafka를 활용하는 서비스를 구현할수 있습니다.

using Akka.Actor;
using AkkaDotModule.Kafka;
using AkkaNetWithKafka.Actors;
using Microsoft.Extensions.Hosting;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ConsumerSystem = AkkaNetWithKafka.Modules.ConsumerSystem;
using ProducerSystem = AkkaNetWithKafka.Modules.ProducerSystem;

namespace AkkaNetWithKafka.Services
{
    public sealed class KafkaService : IHostedService
    {
        private ActorSystem AkkaSystem;

        private ConsumerSystem KafkaConsumerSystem;

        private ProducerSystem KafkaProducerSystem;        

        public Task StartAsync(CancellationToken cancellationToken)
        {
            AkkaSystem = ActorSystem.Create("KafkaService");

            KafkaConsumerSystem = new ConsumerSystem();

            KafkaProducerSystem = new ProducerSystem();

            var consumerActor = AkkaSystem.ActorOf(Props.Create(() => new ConsumerActor()),
                    "consumerActor" /*AKKA가 인식하는 Path명*/);

            //소비자 : 복수개의 소비자 생성가능
            KafkaConsumerSystem.Start(new ConsumerAkkaOption()
            {
                KafkaGroupId = "testGroup",
                BootstrapServers = "kafka:9092",
                RelayActor = consumerActor,               //소비되는 메시지가 지정 액터로 전달
                Topics = "akka100",
            });

            //생산자 : 복수개의 생산자 생성가능
            KafkaProducerSystem.Start(new ProducerAkkaOption()
            {
                BootstrapServers = "kafka:9092",
                ProducerName = "producer1",
            });

            
            List<string> messages = new List<string>();
            for (int i = 0; i < 10; i++)
            {
                messages.Add($"message-{i}");
            }

            //보너스 : 생산의 TPS 속도를 조절할수 있습니다.
            int tps = 10;
            KafkaProducerSystem.SinkMessage("producer1", "akka100", messages, tps);


            return Task.CompletedTask;
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            // strictly speaking this may not be necessary - terminating the ActorSystem would also work
            // but this call guarantees that the shutdown of the cluster is graceful regardless
            await CoordinatedShutdown.Get(AkkaSystem).Run(CoordinatedShutdown.ClrExitReason.Instance);
        }
    }
}

생산속도조절기능

            int tps = 10;
            KafkaProducerSystem.SinkMessage("producer1", "akka100", messages, tps);

생산속도를 조절하여, 소비속도를 조절할수 있습니다. Akka의 Stream기가 사용되었습니다.

생산속도를 조절하지 않는다면, 해당 구현부를 응용하여 소비속만 조절을 할수있는 커스텀한 로직추가 가능합니다.


ConsumerActor

Kafka의 소비처리가 된 메시지는 액터를 통해 전달됨으로, 처리에만 집중하면 되겠습니다.

using Akka.Actor;
using Akka.Event;

namespace AkkaNetWithKafka.Actors
{

    public class ConsumerActor : ReceiveActor
    {
        private readonly ILoggingAdapter logger = Context.GetLogger();

        public ConsumerActor()
        {
            ReceiveAsync<string>(async strData =>
            {                
                logger.Info("ConsumerActor IncomeMessage:" + strData);
                //TODO : Somthing
            });
        }
    }
}

Kafka의 생산/소비기에 액터를 연결함으로, Kafka가 제공하는 분산처리 고성능 퍼시던트 메시지 기능외에

Akka의 다양한 유틸리티기를 추가적으로 확장이용 할수 있습니다.  서로 다른 구현체의 메시지처리기가  유연하게 연결되어 확장하는것은 Reactive Stream 활동의 목표이기도합니다.

참고링크



  • No labels