Page History
| Info |
|---|
Apache Kafka(아파치 카프카)는 LinkedIn에서 개발된 분산 메시징 시스템으로써 2011년에 오픈소스로 공개되었다. 대용량의 실시간 로그처리에 특화된 아키텍처 설계를 통하여 akka.net을 통해 kafka 와 연동하여 kafka의 전송기능을 조금더 풍부하게 개조해보겠습니다 akka.net을 통해 kafka를 연동하는 몇가지 유용한 샘플을 작성해보겠습니다. src위치 : http://git.webnori.com/projects/AKKA/repos/akkastudy/browse/Solution/ServiceB/STUDY/KafkaTest.cs |
...
At-Least-Once Delivery 를 활용하여 Kafka에게 메시지를 꼭 메시지 전송 실패에 대응하여
중복없이 한번만 전송하는 모듈을 작성해보겠습니다.
이모듈은 kafka뿐아니라, 전송성공을 알려주는 그 어떠한 전송모듈과 연동이 가능합니다.
메시지전송의 세부적인 의미
- at-most-once : 한번만 전송하기 때문에 메시지가 유실될수 있습니다.
- at-least-once : 적어도 한번 보내려고, 여러번 보낼수 있기때문에 유실은 없지만, 중복 발생할수 있습니다.
- exactly-once : 정확하게 한번 보내려는 메카니즘으로, 중복이나 유실이 없습니다.
exactly-once 를 목표로 하지만, 100% 보장하기 어렵습니다.
TestCase :
- Kafka에 메시지 20개를 보낸다.
- 테스트를 위해 짝수번째 메시지는, 중간에 카프카에 전송되기전 중간에서 드롭을 시킨다.
- 전송부쪽에서는, Kafka의 성공메시지를 못받았기때문에 홀수번째 이전 짝수번째 메시지를 다시 보낸다.
- 10개의 메시지중 또 짝수번째는 실패한고, 다시보낸다. 이과정을 자동으로 반복한다.
- 결국 20개의 메시지가 모두 성공한다.성공처리가 될것이다.(카프카 메시지 큐검사)
사용부분
| Code Block | ||||
|---|---|---|---|---|
| ||||
actorSystem = ActorSystem.Create("ServiceB");
var kafkaActor = actorSystem.ActorOf<KafkaAtLeastOnceDeliveryReceiveActor>("kafka");
for(int i=0; i<20;i++ )
{
kafkaActor.Tell("can you speak english? :" + i);
} |
전송부분은 액터의 심플한 전송을 그대로 사용합니다.
-사용자 측에서는 이것이 재전송이 되던지 말던지 신경을 쓰지 않으나, 액터설계는 실패시 재전송을 합니다.액터는 실패시 재전송을 할수있게 설계합니다.
이러한 전송 기법을 발사후 망각이라고도 불리며
Fire and forgot : 전투기 조종모델에서, 적군 탱크에 유도 미사일을 쏘고나면
알아서 맞을테니... 그것을 잊고 나는 살아야겠으니, 적전투기의 추격을 피하는데 집중을 하겠다.
전투 전술용어라고 합니다. 이때 액터에서는 Tell 을 사용하며
반대로, 결과를 알고 싶으면 Ask를 사용하여 결과값을 체크하거나 기다릴수 있습니다.
이것은 비동기적으로 설계된 액터가, 언제든 동기적으로 운영이가능하며
동기적으로 작성된 함수도, 비동기적으로 변환이 가능합니다. - Future and Promise#futureactor
액터 설계부분
여기서 언급되지 않는 데이터 구조체는, 위에서 언급한 At-Least-Once Delivery에서 가져옵니다.
...
출력(sink,kafka 전송)량을 지정할수가 있습니다.
고전적인 방법에서는 타이머로 횟수를 제한하거나? ( 복잡도 증가 )
강제로 전송시 Sleep을 주었을 것입니다. ( 제어 주기조절이 어렵고, 블락발생)
AkkaStream은 처리 흐름제어를 조금더 유연하게 할수가 있습니다.
아래 샘플은 초당 1개의 메시지만 처리하도록준 옵션이며
단지, Throttle 라는 흐름량을 제어하는 밸브같은것을 추가하였습니다.
| Code Block | ||||
|---|---|---|---|---|
| ||||
ActorSystem actorSystem;
ActorMaterializer materializer;
KafkaOptions kfka_options;
BrokerRouter kfka_router;
kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
var client = new Producer(kfka_router);
actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
var factorials = source.Scan(1, ( acc, next ) => acc + next);
factorials
.Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
.RunForeach(i => {
Console.WriteLine("SendOne");
client.SendMessageAsync("test", new[] { new Message(i.ToString()) });
}, materializer); |
과제
- 위 두가지 기능을 합하기 ( 전송보장 + 흐름 속도제어)
- 역압력설정:카프카의 입력처리량보다 성공처리량이 늦을시 흐름속력을 늦춤 ( 성공승인 메시지 시간은 카프카 처리 최대 타임아웃으로 지정)