akka.net을 통해 kafka를 연동하는 몇가지 유용한 샘플을 작성해보겠습니다. |
At-Least-Once Delivery 를 활용하여 Kafka에게 메시지를 꼭 한번만 전송하는
모듈을 작성해보겠습니다.
TestCase :
actorSystem = ActorSystem.Create("ServiceB");
var kafkaActor = actorSystem.ActorOf<KafkaAtLeastOnceDeliveryReceiveActor>("kafka");
for(int i=0; i<20;i++ )
{
kafkaActor.Tell("can you speak english? :" + i);
} |
전송부분은 액터의 심플한 전송을 그대로 사용합니다.
-사용자 측에서는 이것이 재전송이 되던지 말던지 신경을 쓰지 않으나, 액터설계는 실패시 재전송을 합니다.
여기서 언급되지 않는 데이터 구조체는, 위에서 언급한 At-Least-Once Delivery에서 가져옵니다.
public class KafkaSendActor : ReceiveActor
{
private ILoggingAdapter log = Context.GetLogger();
protected int messageCnt = 0;
private KafkaOptions kfka_options;
private BrokerRouter kfka_router;
private Producer client;
private string topic = "test";
public KafkaSendActor()
{
kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
client = new Producer(kfka_router);
IActorRef parentActor = null;
Receive<Msg>(msg =>
{
parentActor = Sender;
messageCnt++;
//Test를 위해 메시지 중간에 드랍...
if ( messageCnt % 2 == 0 )
{
log.Warning("Test for Drop Message");
return;
}
log.Debug("Yes I can");
HandleProducer(msg);
});
Receive<Confirm>(msg =>
{
if( msg!=null)
parentActor.Tell(msg, Self);
});
}
public void HandleProducer( Msg msg)
{
Task.Run(async () =>
{
List<ProduceResponse> reponse = await client.SendMessageAsync( topic , new[] { new Message(msg.Message) });
ProduceResponse resOne = reponse[0];
Confirm confirm = new Confirm(msg.DeliveryId);
if ( resOne.Error == 0 )
return confirm;
else
return null;
}).PipeTo(Self);
}
} |
카프카의 비동기 결과처리를 Pipe를 통해 연결하여 비동기처리가 유지되도록
카프카 전송모듈을 작성하고, 카프카로부터 전송 성공메시지를 받았을시 부모액터에게
해당메시지는 잘 처리되었다는 승인 메시지를 보냅니다. 승인메시지를 보내지 못할시
부모액터는 실패한메시지를 재전송을 시도하게될것입니다.
public class KafkaAtLeastOnceDeliveryReceiveActor : AtLeastOnceDeliveryReceiveActor
{
private readonly IActorRef _destionationActor = Context.ActorOf<KafkaSendActor>();
private ILoggingAdapter log = Context.GetLogger();
public KafkaAtLeastOnceDeliveryReceiveActor()
{
Recover<MsgSent>(msgSent => Handler(msgSent));
Recover<MsgConfirmed>(msgConfirmed => Handler(msgConfirmed));
Command<string>(str =>
{
log.Debug("received:" + str);
Persist(new MsgSent(str), Handler);
});
Command<Confirm>(confirm =>
{
//메시지 받음을 확인하고, 해당 메시지를 더이상 안보낸다.
log.Debug("received confirm:" + confirm.DeliveryId);
Persist(new MsgConfirmed(confirm.DeliveryId), Handler);
});
}
private void Handler( MsgSent msgSent )
{
Deliver(_destionationActor.Path, l => new Msg(l, msgSent.Message));
}
private void Handler( MsgConfirmed msgConfirmed )
{
ConfirmDelivery(msgConfirmed.DeliveryId);
}
public override string PersistenceId { get; } = "persistence-id";
} |
재전송 모듈 작성은 간단합니다. AtLeastOnceDeliveryReceiveActor 라는 준비된 액터가 있어서 이것을 상속받아
우리가 원하는 몇가지 메시지 기능을 연결하면 됩니다.
짝수번째 메시지를 최초 성공시키고... 드랍된 메시지(홀수번쨰) 도 결국 성공합니다. 아래 창은 실 카프카의 전송받은 로그창이며 메시지가 중복없음을 확인할수 있습니다. src위치 : http://git.webnori.com/projects/AKKA/repos/akkastudy/browse/Solution/ServiceB/STUDY/KafkaTest.cs |
Akka Stream의 Throttle 를 사용하며, 입력(소스)처리의 제한없이
출력(sink,kafka 전송)량을 지정할수가 있습니다.
ActorSystem actorSystem;
ActorMaterializer materializer;
KafkaOptions kfka_options;
BrokerRouter kfka_router;
kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
var client = new Producer(kfka_router);
actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
var factorials = source.Scan(1, ( acc, next ) => acc + next);
factorials
.Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
.RunForeach(i => {
Console.WriteLine("SendOne");
client.SendMessageAsync("test", new[] { new Message(i.ToString()) });
}, materializer); |