닷넷코어 콘솔에서 Akka를 통해 Kafka를 심플하게 사용할수 있는
AkkaDotModule를 부분이용한 샘플 HeadLessService 를 구성해보았습니다.
여기서 설명된 구현코드는 다음 Git저장소에서 확인및 실행가능합니다.
의존요소
콘솔APP 의존요소
netcoreapp3.1 에서 Test되었으며, 콘솔프로그램에서 다음 의존요소를 사용합니다.
<PackageReference Include="AkkaDotModule.Webnori" Version="1.1.2" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.14" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.14" />
AkkaDotModule.Webnori 의존요소
AkkaDotModule은 AKKA+KAFKA 의 안정적인 모듈조합을 테스트하고 있으며
다음버전으로 동작 확인 작동검증 되었습니다.
<PackageReference Include="Akka.Streams" Version="1.4.12" />
<PackageReference Include="Akka.Streams.Kafka" Version="1.1.0" />
<PackageReference Include="Confluent.Kafka" Version="1.5.2" />
준비 로컬 인프라
Docker를 통한 Kafka구동
bitnami의 StandAlone Kafka 를 이용하였으며
docker-compose up -d 를 통해 구동가능합니다.
version: '3.4'
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
hostname: kafka
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
Topic생성
Kafka의 연결을 확인하고, 테스트에 사용할 topic을 생성합니다. ( kafka tool을 이용하였습니다.)
topic명 : akka100
Topic확인
이 샘플 프로젝트를, 성공적으로 1회 수행하면 10개의 메시지가 Kafka-Topic에 쌓이게 됩니다.
작동컨셉
Pub/Sub 컨셉을 알고 있으면, Kafka의 생산과 소비를 심플한 코드로 활용할수 있게
구성하였습니다.
구현
StandAlone HeadlessService
using AkkaNetWithKafka.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
namespace AkkaNetWithKafka
{
internal class Program
{
private static async Task Main(string[] args)
{
var host = new HostBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddLogging();
// register our host service
services.AddHostedService<KafkaService>();
})
.ConfigureLogging((hostContext, configLogging) =>
{
configLogging.AddConsole();
})
.UseConsoleLifetime()
.Build();
await host.RunAsync();
}
}
}
Console Base,IHostedService를 활용하여 KafkaService를 백그라운드로 작동 하였습니다.
Kafka의 소비기를 작동하기위해서는 항상작동하는 백그라운드 서비스가 필수입니다.
KafkaService
생산기와 소비기를 동시에 생성하고, 샘플로 메시지 생산을 하였습니다.
이 샘플코드를 참고하여, 서비스에 맞는 Kafka를 활용하는 서비스를 구현할수 있습니다.
using Akka.Actor;
using AkkaDotModule.Kafka;
using AkkaNetWithKafka.Actors;
using Microsoft.Extensions.Hosting;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using ConsumerSystem = AkkaNetWithKafka.Modules.ConsumerSystem;
using ProducerSystem = AkkaNetWithKafka.Modules.ProducerSystem;
namespace AkkaNetWithKafka.Services
{
public sealed class KafkaService : IHostedService
{
private ActorSystem AkkaSystem;
private ConsumerSystem KafkaConsumerSystem;
private ProducerSystem KafkaProducerSystem;
public Task StartAsync(CancellationToken cancellationToken)
{
AkkaSystem = ActorSystem.Create("KafkaService");
KafkaConsumerSystem = new ConsumerSystem();
KafkaProducerSystem = new ProducerSystem();
var consumerActor = AkkaSystem.ActorOf(Props.Create(() => new ConsumerActor()),
"consumerActor" /*AKKA가 인식하는 Path명*/);
//소비자 : 복수개의 소비자 생성가능
KafkaConsumerSystem.Start(new ConsumerAkkaOption()
{
KafkaGroupId = "testGroup",
BootstrapServers = "kafka:9092",
RelayActor = consumerActor, //소비되는 메시지가 지정 액터로 전달
Topics = "akka100",
});
//생산자 : 복수개의 생산자 생성가능
KafkaProducerSystem.Start(new ProducerAkkaOption()
{
BootstrapServers = "kafka:9092",
ProducerName = "producer1",
});
List<string> messages = new List<string>();
for (int i = 0; i < 10; i++)
{
messages.Add($"message-{i}");
}
//보너스 : 생산의 TPS 속도를 조절할수 있습니다.
int tps = 10;
KafkaProducerSystem.SinkMessage("producer1", "akka100", messages, tps);
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
// strictly speaking this may not be necessary - terminating the ActorSystem would also work
// but this call guarantees that the shutdown of the cluster is graceful regardless
await CoordinatedShutdown.Get(AkkaSystem).Run(CoordinatedShutdown.ClrExitReason.Instance);
}
}
}
생산속도조절기능
int tps = 10;
KafkaProducerSystem.SinkMessage("producer1", "akka100", messages, tps);
생산속도를 조절하여, 소비속도를 조절할수 있습니다. Akka의 Stream기가 사용되었습니다.
생산속도를 조절하지 않는다면, 해당 구현부를 응용하여 소비속만 조절을 할수있는 커스텀한 로직추가 가능합니다.
ConsumerActor
Kafka의 소비처리가 된 메시지는 액터를 통해 전달됨으로, 처리에만 집중하면 되겠습니다.
using Akka.Actor;
using Akka.Event;
namespace AkkaNetWithKafka.Actors
{
public class ConsumerActor : ReceiveActor
{
private readonly ILoggingAdapter logger = Context.GetLogger();
public ConsumerActor()
{
ReceiveAsync<string>(async strData =>
{
logger.Info("ConsumerActor IncomeMessage:" + strData);
//TODO : Somthing
});
}
}
}
Kafka의 생산/소비기에 액터를 연결함으로, Kafka가 제공하는 분산처리 고성능 퍼시던트 메시지 기능외에
Akka의 다양한 유틸리티기를 추가적으로 확장이용 할수 있습니다. 서로 다른 구현체의 메시지처리기가 유연하게 연결되어 확장하는것은 Reactive Stream 활동의 목표이기도합니다.
참고링크
- https://github.com/confluentinc/confluent-kafka-dotnet
- https://github.com/akkadotnet/Akka.Streams.Kafka
- https://github.com/reactive-streams/reactive-streams-dotnet


