Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

Apache Kafka(아파치 카프카)는 LinkedIn에서 개발된 분산 메시징 시스템으로써 2011년에

오픈소스로 공개되었다. 대용량의 실시간 로그처리에 특화된 아키텍처 설계를 통하여
기존 메시징 시스템보다 우수한 TPS를 보여주고 있다.

akka.net을 통해 kafka를 kafka 와 연동하여

kafka의 전송기능을 조금더 풍부하게 개조해보겠습니다.

src위치 : http://git.webnori.com/projects/AKKA/repos/akkastudy/browse/Solution/ServiceB/STUDY/KafkaTest.cs

...

At-Least-Once Delivery 를 활용하여  Kafka에게 메시지를 메시지 전송 실패에 대응하여

중복없이 한번만 전송하는 모듈을 작성해보겠습니다.

이모듈은 kafka뿐아니라, 전송성공을 알려주는 그 어떠한 전송모듈과 연동이 가능합니다.


메시지전송의 세부적인 의미

  • at-most-once : 한번만 전송하기 때문에 메시지가 유실될수 있습니다.
  • at-least-once : 적어도 한번 보내려고, 여러번 보낼수 있기때문에 유실은 없지만, 중복 발생할수 있습니다.
  • exactly-once : 정확하게 한번 보내려는 메카니즘으로, 중복이나 유실이 없습니다.

exactly-once 를 목표로 하지만, 100% 보장하기 어렵습니다.


TestCase :

  • Kafka에 메시지 20개를 보낸다.
  • 테스트를 위해 짝수번째 메시지는, 중간에 카프카에 전송되기전 중간에서 드롭을 시킨다.
  • 전송부쪽에서는, Kafka의 성공메시지를 못받았기때문에 홀수번째 이전 짝수번째 메시지를 다시 보낸다.
  • 10개의 메시지중 또 짝수번째는 실패한고, 다시보낸다. 이과정을 자동으로 반복한다.
  • 결국 20개의 메시지가 모두 성공한다.성공처리가 될것이다.(카프카 메시지 큐검사)

사용부분

Code Block
languagec#
themeEmacs
            actorSystem = ActorSystem.Create("ServiceB");
            var kafkaActor = actorSystem.ActorOf<KafkaAtLeastOnceDeliveryReceiveActor>("kafka");
            for(int i=0; i<20;i++ )
            {
                kafkaActor.Tell("can you speak english? :" + i);
            }

전송부분은 액터의 심플한 전송을 그대로 사용합니다.

-사용자 측에서는 이것이 재전송이 되던지 말던지 신경을 쓰지 않으나, 액터설계는 실패시 재전송을 합니다.액터는 실패시 재전송을 할수있게 설계합니다.

이러한 전송 기법을 발사후 망각이라고도 불리며

Fire and forgot : 전투기 조종모델에서, 적군 탱크에 유도 미사일을 쏘고나면

알아서 맞을테니... 그것을 잊고 나는 살아야겠으니, 적전투기의 추격을 피하는데 집중을 하겠다.  

전투 전술용어라고 합니다. 이때 액터에서는 Tell 을 사용하며

반대로, 결과를 알고 싶으면 Ask를  사용하여 결과값을 체크하거나 기다릴수 있습니다.

이것은 비동기적으로 설계된 액터가, 언제든 동기적으로 운영이가능하며

동기적으로 작성된 함수도, 비동기적으로 변환이 가능합니다.  - Future and Promise#futureactor


액터 설계부분

여기서 언급되지 않는 데이터 구조체는, 위에서 언급한 At-Least-Once Delivery에서 가져옵니다.

...

AkkaStream은 처리 흐름제어를 조금더 유연하게 할수가 있습니다.


아래 샘플은 초당 1개의 메시지만 처리하도록준 옵션이며

단지, Throttle 라는 흐름량을 제어하는 밸브같은것을 추가하였습니다.

Code Block
languagec#
themeEmacs
ActorSystem actorSystem;
ActorMaterializer materializer;
KafkaOptions kfka_options;
BrokerRouter kfka_router;

kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
var client = new Producer(kfka_router);

actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();

Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
var factorials = source.Scan(1, ( acc, next ) => acc + next);

factorials
     .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
     .RunForeach(i => {
         Console.WriteLine("SendOne");
         client.SendMessageAsync("test", new[] { new Message(i.ToString()) });
     }, materializer);

...