Page History
...
-사용자 측에서는 이것이 재전송이 되던지 말던지 신경을 쓰지 않으나, 액터설계는 실패시 재전송을 합니다.
액터 설계부분
여기서 언급되지 않는 데이터 구조체는, 위에서 언급한 At-Least-Once Delivery에서 가져옵니다.
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
public class KafkaSendActor : ReceiveActor
{
private ILoggingAdapter log = Context.GetLogger();
protected int messageCnt = 0;
private KafkaOptions kfka_options;
private BrokerRouter kfka_router;
private Producer client;
private string topic = "test";
public KafkaSendActor()
{
kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
client = new Producer(kfka_router);
IActorRef parentActor = null;
Receive<Msg>(msg =>
{
parentActor = Sender;
messageCnt++;
//Test를 위해 메시지 중간에 드랍...
if ( messageCnt % 2 == 0 )
{
log.Warning("Test for Drop Message");
return;
}
log.Debug("Yes I can");
HandleProducer(msg);
});
Receive<Confirm>(msg =>
{
if( msg!=null)
parentActor.Tell(msg, Self);
});
}
public void HandleProducer( Msg msg)
{
Task.Run(async () =>
{
List<ProduceResponse> reponse = await client.SendMessageAsync( topic , new[] { new Message(msg.Message) });
ProduceResponse resOne = reponse[0];
Confirm confirm = new Confirm(msg.DeliveryId);
if ( resOne.Error == 0 )
return confirm;
else
return null;
}).PipeTo(Self);
}
} |
...