kafka 자체로 고성능 전송처리를 간편하게 이용할수 있으며

kafka client를 통하여 여러 플랫폼에서 전송 처리가 가능합니다.

kafka 기능을 Actor혹은 Akka Stream과 연동하여 조금더 유연하고 커스텀한

메시지 처리기능을 만들어 보겠습니다. 이것을 하기전

kafka를 기본적으로 사용하는 방법부터 간단하게 알아 보겠습니다.


AkkaStream은 흐름을 합하고, 분기하는 Graph-DSL 개념을 사용하여 복잡한형태의

스트림처리를 단순화할수 있습니다.  이것은 양끝단은 안정된 스트림서비스를 사용하고

스트림처리의 중간과정을 커스텀화할수 있습니다. 예를 들어 중간에 흐름을 제어하는

중간 벨브(throttle)를 간단하게 추가할수가 있습니다. 

준비하기

1.설치

카프카 서버 준비하기 :

준비하기

kafka는 Java버젼으로 구동되기떄문에 OS별로 자바를 셋팅하고 실행하는 방식의

차이일뿐 kafka를 사용하는 방식은 동일합니다. 여기서는 윈도우 기준으로 설명하겠습니다.



download

Link : https://kafka.apache.org/downloads

설정

사용하고 싶은 버젼을 다운로드 받고, 특정경로에 압축을 풉니다.

ex>  C:\datatool\kafka_2.12-1.0>

path 설정

윈도우 환경 변수에, 어느 디렉토리에건 실행할수 있게 path를 추가해줍니다.

C:\datatool\kafka_2.12-1.0\bin\windows

실행

C:\datatool\kafka_2.12-1.0> 경로에서 각 서버들을 실행하겠습니다.

주키퍼 실행

zookeeper-server-start config/zookeeper.properties


카프카 실행

kafka-server-start config/server.properties


Topic생성

kafka가 잘 실행되는지 Topic 생성을 통해 확인가능합니다.

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test


Topic Console Listen

특정 토픽에 메시지 생산모듈을 작성한후

특정 토픽에서 메시지큐처리가 되는지 체크합니다.

kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning


또는 KafkaTool을 이용하여 UI에서 확인가능합니다.

http://www.kafkatool.com/download.html




2.솔류션내 라이브러리 설치(.net)


KAFKA 사용하기

Producer
using KafkaNet;
  var options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
  var router = new BrokerRouter(options);
  var client = new Producer(router);
  client.SendMessageAsync("test", new[] { new Message("hello world") }).Wait(); 

Consumer
var options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("TestHarness", router));

//Consume returns a blocking IEnumerable (ie: never ending stream)
foreach (var message in consumer.Consume())
{
    Console.WriteLine("Response: P{0},O{1} : {2}", 
        message.Meta.PartitionId, message.Meta.Offset, message.Value);  
}

KAFKA는 메시지를 생산하는 생산자(Producer)

그리고 쌓인 메시지를 처리하는 소비자(Consumer) 로 구분하며

그것을 중계하는 서비스를 구동하는 아주 직관적인 메시지 큐 솔류션입니다.

KAFKA를 AKKA.net에서 확장하기

KAFKA의 기본 메시지 큐를 이용하여

다음과 같이AKKA를 통해 확장할것입니다.

  • 유실없는 신뢰성메시지 전송
  • 다양한 전략으로 메시지 전송량 제어(역압력,초당처리등)
  • 카프카 기본 토픽외에 액터를 통한 서브 채널 핸들링\


Apache Kafka(아파치 카프카)는 LinkedIn에서 개발된 분산 메시징 시스템으로써 2011년에

오픈소스로 공개되었다. 대용량의 실시간 로그처리에 특화된 아키텍처 설계를 통하여
기존 메시징 시스템보다 우수한 TPS를 보여주고 있다.

akka.net을 통해 kafka 와 연동하여

kafka의 전송기능을 조금더 풍부하게 개조해보겠습니다.

src위치 : http://git.webnori.com/projects/AKKA/repos/akkastudy/browse/Solution/ServiceB/STUDY/KafkaTest.cs


At-Least-Once Delivery

At-Least-Once Delivery 를 활용하여  Kafka에게 메시지 전송 실패에 대응하여

중복없이 한번만 전송하는 모듈을 작성해보겠습니다.

이모듈은 kafka뿐아니라, 전송성공을 알려주는 그 어떠한 전송모듈과 연동이 가능합니다.


메시지전송의 세부적인 의미

  • at-most-once : 한번만 전송하기 때문에 메시지가 유실될수 있습니다.
  • at-least-once : 적어도 한번 보내려고, 여러번 보낼수 있기때문에 유실은 없지만, 중복 발생할수 있습니다.
  • exactly-once : 정확하게 한번 보내려는 메카니즘으로, 중복이나 유실이 없습니다.

exactly-once 를 목표로 하지만, 100% 보장하기 어렵습니다.


TestCase :

  • Kafka에 메시지 20개를 보낸다.
  • 테스트를 위해 짝수번째 메시지는, 카프카에 전송되기전 중간에서 드롭을 시킨다.
  • 전송부쪽에서는, Kafka의 성공메시지를 못받았기때문에 이전 짝수번째 메시지를 다시 보낸다.
  • 10개의 메시지중 또 짝수번째는 실패한고, 다시보낸다. 이과정을 자동으로 반복한다.
  • 결국 20개의 메시지가 모두 성공처리가 될것이다.(카프카 메시지 큐검사)

사용부분

            actorSystem = ActorSystem.Create("ServiceB");
            var kafkaActor = actorSystem.ActorOf<KafkaAtLeastOnceDeliveryReceiveActor>("kafka");
            for(int i=0; i<20;i++ )
            {
                kafkaActor.Tell("can you speak english? :" + i);
            }

전송부분은 액터의 심플한 전송을 그대로 사용합니다.

사용자 측에서는 이것이 재전송이 되던지 말던지 신경을 쓰지 않으나, 액터는 실패시 재전송을 할수있게 설계합니다.

이러한 전송 기법을 발사후 망각이라고도 불리며

Fire and forgot : 전투기 조종모델에서, 적군 탱크에 유도 미사일을 쏘고나면

알아서 맞을테니... 그것을 잊고 나는 살아야겠으니, 적전투기의 추격을 피하는데 집중을 하겠다.  

전투 전술용어라고 합니다. 이때 액터에서는 Tell 을 사용하며

반대로, 결과를 알고 싶으면 Ask를  사용하여 결과값을 체크하거나 기다릴수 있습니다.

이것은 비동기적으로 설계된 액터가, 언제든 동기적으로 운영이가능하며

동기적으로 작성된 함수도, 비동기적으로 변환이 가능합니다.  - Future and Promise#futureactor


액터 설계부분

여기서 언급되지 않는 데이터 구조체는, 위에서 언급한 At-Least-Once Delivery에서 가져옵니다.

카프카 전송모듈
public class KafkaSendActor : ReceiveActor
{
    private ILoggingAdapter log = Context.GetLogger();
    protected int messageCnt = 0;
    private KafkaOptions kfka_options;
    private BrokerRouter kfka_router;
    private Producer client;
    private string topic = "test";

    public KafkaSendActor()
    {
        kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
        kfka_router = new BrokerRouter(kfka_options);
        client = new Producer(kfka_router);
        IActorRef parentActor = null;
        
        Receive<Msg>(msg =>
        {
            parentActor = Sender;
            messageCnt++;
            //Test를 위해 메시지 중간에 드랍...
            if ( messageCnt % 2 == 0 )
            {
                log.Warning("Test for Drop Message");
                return;
            }
            log.Debug("Yes I can");
            HandleProducer(msg);                
        });

        Receive<Confirm>(msg =>
        {
            if( msg!=null)
                parentActor.Tell(msg, Self);
        });

    }
    public void HandleProducer( Msg msg)
    {
        Task.Run(async () =>
        {
            List<ProduceResponse> reponse = await client.SendMessageAsync( topic , new[] { new Message(msg.Message) });
            ProduceResponse resOne = reponse[0];
            Confirm confirm = new Confirm(msg.DeliveryId);
            if ( resOne.Error == 0 )
                return confirm;
            else
                return null;                
        }).PipeTo(Self);
    }        
}

카프카의 비동기 결과처리를 Pipe를 통해 연결하여 비동기처리가 유지되도록

카프카 전송모듈을 작성하고, 카프카로부터 전송 성공메시지를 받았을시 부모액터에게

해당메시지는 잘 처리되었다는 승인 메시지를 보냅니다. 승인메시지를 보내지 못할시 

부모액터는 실패한메시지를 재전송을 시도하게될것입니다.



재전송을 책임지는 액터
public class KafkaAtLeastOnceDeliveryReceiveActor : AtLeastOnceDeliveryReceiveActor
{
    private readonly IActorRef _destionationActor = Context.ActorOf<KafkaSendActor>();
    private ILoggingAdapter log = Context.GetLogger();

    public KafkaAtLeastOnceDeliveryReceiveActor()
    {
        Recover<MsgSent>(msgSent => Handler(msgSent));
        Recover<MsgConfirmed>(msgConfirmed => Handler(msgConfirmed));

        Command<string>(str =>
        {
            log.Debug("received:" + str);
            Persist(new MsgSent(str), Handler);
        });

        Command<Confirm>(confirm =>
        {
            //메시지 받음을 확인하고, 해당 메시지를 더이상 안보낸다.
            log.Debug("received confirm:" + confirm.DeliveryId);
            Persist(new MsgConfirmed(confirm.DeliveryId), Handler);
        });
    }

    private void Handler( MsgSent msgSent )
    {
        Deliver(_destionationActor.Path, l => new Msg(l, msgSent.Message));
    }

    private void Handler( MsgConfirmed msgConfirmed )
    {
        ConfirmDelivery(msgConfirmed.DeliveryId);
    }

    public override string PersistenceId { get; } = "persistence-id";
}

재전송 모듈 작성은 간단합니다.   AtLeastOnceDeliveryReceiveActor 라는 준비된 액터가 있어서 이것을 상속받아

우리가 원하는 몇가지 메시지 기능을 연결하면 됩니다.

고전적인 방법에서는 실패할시 재전송을 시도하였을것입니다. 하지만 실패원인을 모두 확인하는것은 불가능에 가깝고

AtLeastOnceDelivery 에서는 성공하지 못한(특정시간이내에) 메시지에대해서 재전송을 하게됩니다.


실패원인에 따라 다음과같이 개선점도 필요합니다. 직접 설계에 반영 해보세요

  • 물리적 네트워크 에러라고 판단시 대처전략과 이후 복구시 모두 보낼수 있는 방법?
  • 실패 처리에대해 특정 횟수이상 재시도 금지
  • 카프카의 세부적인(resOne.Error) 에러에대해 세부적인 에러 핸들링
  • 그럼에도 불구하고 중복메시지가 발생할수 있는 가능성 ( 카프카의 응답이느릴때, 메시지 승인처리 악순환이 반복되는 상황)

참고문서: https://doc.akka.io/docs/akka/current/persistence.html#at-least-once-delivery  (AKKA SCALA 의 오리지널 문서)


짝수번째 메시지를 최초 성공시키고... 드랍된 메시지(홀수번쨰)

도 결국 성공합니다. 아래 창은 실제 카프카의 전송받은 로그창이며 메시지가 중복 없음을 확인할수 있습니다.





초당 전송횟수 조절

Akka Stream의 Throttle 를 사용하며, 입력(소스)처리의 제한없이 

출력(sink,kafka 전송)량을 지정할수가 있습니다.

고전적인 방법에서는 타이머로 횟수를 제한하거나? ( 복잡도 증가 )

강제로 전송시 Sleep을 주었을 것입니다. ( 제어 주기조절이 어렵고, 블락발생)

AkkaStream은 처리 흐름제어를 조금더 유연하게 할수가 있습니다.


아래 샘플은 초당 1개의 메시지만 처리하도록준 옵션이며

단지, Throttle 라는 흐름량을 제어하는 밸브같은것을 추가하였습니다.

ActorSystem actorSystem;
ActorMaterializer materializer;
KafkaOptions kfka_options;
BrokerRouter kfka_router;

kfka_options = new KafkaOptions(new Uri("http://localhost:9092"), new Uri("http://localhost:9092"));
kfka_router = new BrokerRouter(kfka_options);
var client = new Producer(kfka_router);

actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();

Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 10));
var factorials = source.Scan(1, ( acc, next ) => acc + next);

factorials
     .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
     .RunForeach(i => {
         Console.WriteLine("SendOne");
         client.SendMessageAsync("test", new[] { new Message(i.ToString()) });
     }, materializer);


과제

  • 위 두가지 기능을 합하기 ( 전송보장 + 흐름 속도제어)
  • 역압력설정:카프카의 입력처리량보다 성공처리량이 늦을시 흐름속력을 늦춤 ( 성공승인 메시지 시간은 카프카 처리 최대 타임아웃으로 지정)






KAKFA를 AKKA(JAVA/SCALA)에서 확장하기

akka-stream-kafka는 현재 akka stream의 서브 프로젝트로  kafka client보다

더 유연한 사용패턴을 제공해줍니다. 외부 stream 서비스를 사용시 이것을 사용하는

시스템도 비동기 스트림처리기가 존재할시 더 유연하게 상호 운영이됩니다.


참고 : https://doc.akka.io/docs/akka-stream-kafka/current/producer.html

Akka Streams는 내부적으로 Reactive Streams 인터페이스를 사용하며, AKKA를 사용하는 최종 사용자를

대상으로 제공되며 두 인터페이스는 다르지만

서로 다른 스트리밍 구현이 상호 운영될수 있도록하는 Reactive Streams의 프로젝트의 기대치와  일치합니다. 

http://www.reactive-streams.org/