닷넷코어 콘솔에서 Akka를 통해 Kafka를 심플하게 사용할수 있는
AkkaDotModule를 부분이용한 샘플 HeadLessService 를 구성해보았습니다.
여기서 설명된 구현코드는 다음 Git저장소에서 확인및 실행가능합니다.
의존요소
콘솔APP 의존요소
netcoreapp3.1 에서 Test되었으며, 콘솔프로그램에서 다음 의존요소를 사용합니다.
<PackageReference Include="AkkaDotModule.Webnori" Version="1.1.2" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.14" /> <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.14" />
AkkaDotModule.Webnori 의존요소
AkkaDotModule은 AKKA+KAFKA 의 안정적인 모듈조합을 테스트하고 있으며
다음버전으로 동작 확인 작동검증 되었습니다.
<PackageReference Include="Akka.Streams" Version="1.4.12" /> <PackageReference Include="Akka.Streams.Kafka" Version="1.1.0" /> <PackageReference Include="Confluent.Kafka" Version="1.5.2" />
준비 로컬 인프라
Docker를 통한 Kafka구동
bitnami의 StandAlone Kafka 를 이용하였으며
docker-compose up -d 를 통해 구동가능합니다.
version: '3.4' services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: hostname: kafka image: 'bitnami/kafka:latest' ports: - '9092:9092' environment: - KAFKA_ADVERTISED_HOST_NAME=kafka - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes
Topic생성
Kafka의 연결을 확인하고, 테스트에 사용할 topic을 생성합니다. ( kafka tool을 이용하였습니다.)
topic명 : akka100
Topic확인
이 샘플 프로젝트를, 성공적으로 1회 수행하면 10개의 메시지가 Kafka-Topic에 쌓이게 됩니다.
작동컨셉
Pub/Sub 컨셉을 알고 있으면, Kafka의 생산과 소비를 심플한 코드로 활용할수 있게
구성하였습니다.
구현
StandAlone HeadlessService
using AkkaNetWithKafka.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading.Tasks; namespace AkkaNetWithKafka { internal class Program { private static async Task Main(string[] args) { var host = new HostBuilder() .ConfigureServices((hostContext, services) => { services.AddLogging(); // register our host service services.AddHostedService<KafkaService>(); }) .ConfigureLogging((hostContext, configLogging) => { configLogging.AddConsole(); }) .UseConsoleLifetime() .Build(); await host.RunAsync(); } } }
Console Base,IHostedService를 활용하여 KafkaService를 백그라운드로 작동 하였습니다.
Kafka의 소비기를 작동하기위해서는 항상작동하는 백그라운드 서비스가 필수입니다.
KafkaService
생산기와 소비기를 동시에 생성하고, 샘플로 메시지 생산을 하였습니다.
이 샘플코드를 참고하여, 서비스에 맞는 Kafka를 활용하는 서비스를 구현할수 있습니다.
using Akka.Actor; using AkkaDotModule.Kafka; using AkkaNetWithKafka.Actors; using Microsoft.Extensions.Hosting; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using ConsumerSystem = AkkaNetWithKafka.Modules.ConsumerSystem; using ProducerSystem = AkkaNetWithKafka.Modules.ProducerSystem; namespace AkkaNetWithKafka.Services { public sealed class KafkaService : IHostedService { private ActorSystem AkkaSystem; private ConsumerSystem KafkaConsumerSystem; private ProducerSystem KafkaProducerSystem; public Task StartAsync(CancellationToken cancellationToken) { AkkaSystem = ActorSystem.Create("KafkaService"); KafkaConsumerSystem = new ConsumerSystem(); KafkaProducerSystem = new ProducerSystem(); var consumerActor = AkkaSystem.ActorOf(Props.Create(() => new ConsumerActor()), "consumerActor" /*AKKA가 인식하는 Path명*/); //소비자 : 복수개의 소비자 생성가능 KafkaConsumerSystem.Start(new ConsumerAkkaOption() { KafkaGroupId = "testGroup", BootstrapServers = "kafka:9092", RelayActor = consumerActor, //소비되는 메시지가 지정 액터로 전달 Topics = "akka100", }); //생산자 : 복수개의 생산자 생성가능 KafkaProducerSystem.Start(new ProducerAkkaOption() { BootstrapServers = "kafka:9092", ProducerName = "producer1", }); List<string> messages = new List<string>(); for (int i = 0; i < 10; i++) { messages.Add($"message-{i}"); } //보너스 : 생산의 TPS 속도를 조절할수 있습니다. int tps = 10; KafkaProducerSystem.SinkMessage("producer1", "akka100", messages, tps); return Task.CompletedTask; } public async Task StopAsync(CancellationToken cancellationToken) { // strictly speaking this may not be necessary - terminating the ActorSystem would also work // but this call guarantees that the shutdown of the cluster is graceful regardless await CoordinatedShutdown.Get(AkkaSystem).Run(CoordinatedShutdown.ClrExitReason.Instance); } } }
생산속도조절기능
int tps = 10; KafkaProducerSystem.SinkMessage("producer1", "akka100", messages, tps);
생산속도를 조절하여, 소비속도를 조절할수 있습니다. Akka의 Stream기가 사용되었습니다.
생산속도를 조절하지 않는다면, 해당 구현부를 응용하여 소비속만 조절을 할수있는 커스텀한 로직추가 가능합니다.
ConsumerActor
Kafka의 소비처리가 된 메시지는 액터를 통해 전달됨으로, 처리에만 집중하면 되겠습니다.
using Akka.Actor; using Akka.Event; namespace AkkaNetWithKafka.Actors { public class ConsumerActor : ReceiveActor { private readonly ILoggingAdapter logger = Context.GetLogger(); public ConsumerActor() { ReceiveAsync<string>(async strData => { logger.Info("ConsumerActor IncomeMessage:" + strData); //TODO : Somthing }); } } }
Kafka의 생산/소비기에 액터를 연결함으로, Kafka가 제공하는 분산처리 고성능 퍼시던트 메시지 기능외에
Akka의 다양한 유틸리티기를 추가적으로 확장이용 할수 있습니다. 서로 다른 구현체의 메시지처리기가 유연하게 연결되어 확장하는것은 Reactive Stream 활동의 목표이기도합니다.
참고링크
- https://github.com/confluentinc/confluent-kafka-dotnet
- https://github.com/akkadotnet/Akka.Streams.Kafka
- https://github.com/reactive-streams/reactive-streams-dotnet