Page History
| Info |
|---|
akka.net을 통해 kafka를 연동하는 몇가지 유용한 샘플을 작성해보겠습니다. |
| Table of Contents |
|---|
초당 전송횟수 조절
Akka Stream의 Throttle 를 사용하며, 입력(소스)처리의 제한없이
...
- Kafka에 메시지 20개를 보낸다.
- 테스트를 위해 짝수번째 메시지는, 중간에 드롭을 시킨다.
- 전송부쪽에서는, Kafka의 성공메시지를 못받았기때문에 홀수번째 메시지를 다시 보낸다.
- 결국 20개의 메시지가 모두 성공한다.
사용부분
| Code Block | ||||
|---|---|---|---|---|
| ||||
actorSystem = ActorSystem.Create("ServiceB");
var kafkaActor = actorSystem.ActorOf<KafkaAtLeastOnceDeliveryReceiveActor>("kafka");
for(int i=0; i<20;i++ )
{
kafkaActor.Tell("can you speak english? :" + i);
} |
...
-사용자 측에서는 이것이 재전송이 되던지 말던지 신경을 쓰지 않으나, 액터설계는 실패시 재전송을 합니다.
액터 설계부분
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
public class KafkaSendActor : ReceiveActor
{
private ILoggingAdapter log = Context.GetLogger();
protected int messageCnt = 0;
private KafkaOptions kfka_options;
private BrokerRouter kfka_router;
private Producer client;
private string topic = "test";
public KafkaSendActor()
{
kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
client = new Producer(kfka_router);
IActorRef parentActor = null;
Receive<Msg>(msg =>
{
parentActor = Sender;
messageCnt++;
//Test를 위해 메시지 중간에 드랍...
if ( messageCnt % 2 == 0 )
{
log.Warning("Test for Drop Message");
return;
}
log.Debug("Yes I can");
HandleProducer(msg);
});
Receive<Confirm>(msg =>
{
if( msg!=null)
parentActor.Tell(msg, Self);
});
}
public void HandleProducer( Msg msg)
{
Task.Run(async () =>
{
List<ProduceResponse> reponse = await client.SendMessageAsync( topic , new[] { new Message(msg.Message) });
ProduceResponse resOne = reponse[0];
Confirm confirm = new Confirm(msg.DeliveryId);
if ( resOne.Error == 0 )
return confirm;
else
return null;
}).PipeTo(Self);
}
} |
...