Azure 의 EventHub를 셋팅하고  .Net Core Akka.net 액터모델을 만들고 기능을 연결해보겠습니다.

EventHub를 사용하는 전반적인 느낌은, KAFKA의 기능을 흡수한 KAFKA-SASS 였으며 

KAFKA의 셋팅과 토픽관리를 Azure 포털로 간편하게 할수 있었습니다.

사용프레임워크

  • net core 3.1
  • akka.net
  • Confluent.Kafka


자바의 경우 다음 링크 AkkaStream With EventHub 에서 AKKAStream을 사용하여  연결가능하며


Akka.net에는 아직 EventHub(Kafka 모드)와 연결지원하는 모드를 찾지못하여

액터모델에 Kafka클라이언트를 탑재하여, 메시징 처리에유연한 액터모델버전으로 확장하였습니다.


Event Hub소개

Event Hubs는 구성 또는 관리 오버헤드가 거의 없는 완전 관리형 PaaS(Platform-as-a-Service)이므로 비즈니스 솔루션에 집중할 수 있습니다. Apache Kafka 에코시스템을 위한 Event Hubs는 클러스터를 관리, 구성 또는 실행할 필요가 없는 PaaS Kafka 환경을 제공합니다.

주요 아키텍처

  • 이벤트 생산자: 이벤트 허브에 데이터를 보내는 엔터티입니다. 이벤트 게시자는 HTTPS 또는 AMQP 1.0 또는 Apache Kafka 1.0 이상을 사용하여 이벤트를 게시할 수 있습니다.
  • 파티션: 각 소비자는 메시지 스트림의 특정 하위 세트 또는 파티션만 읽습니다.
  • 소비자 그룹: 전체 이벤트 허브의 보기(상태, 위치 또는 오프셋)입니다. 소비자 그룹을 사용하면 각기 별도의 이벤트 스트림 보기가 표시되는 애플리케이션을 사용할 수 있습니다. 소비자는 자신의 속도로 자신의 오프셋을 통해 독립적으로 스트림을 읽습니다.
  • 처리량 단위: Event Hubs의 처리량 용량을 제어하는 미리 구입한 용량 단위입니다.
  • 이벤트 수신기: 이벤트 허브에서 이벤트 데이터를 읽는 엔터티입니다. 모든 Event Hubs 소비자는 AMQP 1.0 세션을 통해 연결합니다. Event Hubs 서비스는 사용할 수 있게 되면 세션을 통해 이벤트를 제공합니다. 모든 Kafka 소비자는 Kafka 프로토콜 1.0 이상을 통해 연결합니다.


개념적 맵핑

항목(Topic) 만들기

여기서 이벤트허브의 name은 , kafka의 topic에 해당하며, "akka100" 이라고 생성해두겠습니다. ( 아래 테스트 샘플에 사용되는 토픽명)

가격

카프카인터페이스를 활용하려면 기본이아닌 표준플랜을 사용해야 

Kafka 인터페이스가 활성화됩니다. ( 몇번 삽질끝에 알아냄, Kafka활용하려면 소비자 그룹 1은 부족하며 메시지당 이용가격은 동일하나 고정 인스턴스 비용이 2배이네요)

생성

BootstrapServers = "webnori-kafka.servicebus.windows.net:9093"

여기서 지정한 이름은 Kafka의 Brokers에 해당하는 서브도메인이 됩니다.

Port는 9093으로 SSL만 지원합니다. ( KAFKA기준 정확한 보안 프로토콜명은 : SaslSsl , Kafka의 기능을 사용하여 핸드쉐이크가 이루어짐)

모니터링

생성후 KAFKASUFACE가 활성화되는지 체크합니다.

( 저의경우 기본플랜으로 선택하였다가 리전변경및 여러번 실패후, 상위 플랜으로 재생성하여 활성화가 되었습니다.  -모든 리전지원합니다.   )


모든리전 지원하며, 기본플랜이아닌 표준플랜이상 KAFKA 인터페이스가 지원합니다.

양끝단을 Kafka 인터페이스로 맞추고, 클라우드로 관리되는 큐로 보여집니다.


Kafka의 Suface를 지원함으로, Kafka와 동일한 수준에서 다양한 클라이언트 연동이 가능합니다.

참고 : https://docs.microsoft.com/ko-kr/azure/event-hubs/apache-kafka-developer-guide


여기서는 닷넷용 KafkaClient를 활용하여, Akka.net 액터모델에 탑재를 시도해보겠습니다.

( Akka.net Stream에서 Kafka SaSSL연결이 지원되면 이 모듈사용을 권장합니다. - 작성기준 지원되지 않아 액터모델로 변종하였습니다.)

액터모델을 통해 Kafka 메시지 생성 - 닷넷코어


셋팅이 끝났으면, 닷넷 코어 어플리케이션을 통해 Auzre 이벤트 허브에 메시징을 생산하는 시도를 해보겠습니다.

사전준비

AkkaDotModule은 AKKA.net을 포함 KAFKA사용을 조금더 쉽게 사용하게끔 웹노리 주인장이 변종한 모듈이며

Akka.net의 기능과 파생된 녀석들이 방대하기때문에, 필요한만큼만 안정적인 버전으로 패키징한 모듈입니다.

https://www.nuget.org/packages/AkkaDotModule.Webnori/  -작성기준 1.0.9

Nuget을 통해 위 모듈을 이용하면, 여기 설명하는 베이스로  작동 가능합니다.


SAS 정책추가

            var producerAkkaOption = new ProducerAkkaOption()
            {
                BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
                ProducerName = "webnori-kafka",
                SecuritOption = new KafkaSecurityOption()
                {
                    SecurityProtocol = SecurityProtocol.SaslSsl,
                    SaslMechanism = SaslMechanism.Plain,
                    SaslUsername = "$ConnectionString",
                    SaslPassword = "Endpoint=sb://webnori-kafka.servicebus.windows.net/;SharedAccessKeyName=kafka-client;SharedAccessKey=PfL0qRUm50AXZHRXLiVfnatIRI3OqAh+dT6Owsqrd2M=",
                    SslCaLocation = "./cacert.pem"
                }
            };

위에 생성한 정책은 , 연결문자를 SaslPassword 부분에 붙여넣기 하면되겠습니다.

SSL을 위한 Global Root 인증서 : https://github.com/psmon/AkkaDotModule/blob/master/AkkaDotBootApi/cacert.pem


접근보안추가

공개할수 있는 주요 이유는, IP Base로 허용 네트워크를 한번더 걸어 놓았습니다.

IP허용을 추가로 걸어놓았기때문에 , 접속 String및 접근 토큰을 알고 있다고 하더라도 전송하지 못합니다.

( 2중보안이 권장되며, 공격대응에 비용이증가하는지 검토못하였으니 공격은 하지말아주세요 - 듀토리얼 소임을 다했으니, 소멸예정입니다. )

오토 스케일링은 클라우드 비용이기때문에 연습과정에서는 항상 Off해둡니다.

EventHub에 Kafka모드로 메시지 생산하기

EventHub(Kafka)에 가장 심플한 방식으로 전송되게 하려고 노력하였으며

아래와같이 사용하면 되겠습니다. Kafka와 호환되며 SecurityOption이 없으면 비SSL모드로 작동합니다.

            var producerAkkaOption = new ProducerAkkaOption()
            {
                BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
                ProducerName = "webnori-kafka",
                SecuritOption = new KafkaSecurityOption()
                {
                    SecurityProtocol = SecurityProtocol.SaslSsl,
                    SaslMechanism = SaslMechanism.Plain,
                    SaslUsername = "$ConnectionString",
                    SaslPassword = "Endpoint=sb://webnori-kafka.servicebus.windows.net/;SharedAccessKeyName=kafka-client;SharedAccessKey=PfL0qRUm50AXZHRXLiVfnatIRI3OqAh+dT6Owsqrd2M=",
                    SslCaLocation = "./cacert.pem"
                }
            };

            string producerActorName = "producerActor";

            var producerActor= AkkaLoad.RegisterActor(producerActorName /*AkkaLoad가 인식하는 유니크명*/,
                actorSystem.ActorOf(Props.Create(() => 
                    new ProducerActor(producerAkkaOption)),
                    producerActorName /*AKKA가 인식하는 Path명*/
            ));

            producerActor.Tell(new BatchData()
            {
                Data = new KafkaTextMessage()
                {
                    Topic = "akka100",
                    Message = "testData"
                }
            });

샘플 코드 : 액터모델로 작성될시 장점은, AkkaStream과 연동하여 속도조절기,버퍼방지등 여러가지 유용한 장치를 끝단에 연결할수 있기때문입니다.


Consumer 구현

            //ConsumerActor
            var consumerAkkaOption = new ConsumerAkkaOption()
            {
                BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
                Topics = "akka100",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                KafkaGroupId = "akakTestGroup",
                RelayActor = null,  //작업자 액터를 연결하면, 소비메시지가 작업자에게 전달된다 ( 컨슘기능과 작업자 기능의 분리)
                SecurityOption = new KafkaSecurityOption()
                {
                    SecurityProtocol = SecurityProtocol.SaslSsl,
                    SaslMechanism = SaslMechanism.Plain,
                    SaslUsername = "$ConnectionString",
                    SaslPassword = "Endpoint=sb://webnori-kafka.servicebus.windows.net/;SharedAccessKeyName=kafka-client;SharedAccessKey=PfL0qRUm50AXZHRXLiVfnatIRI3OqAh+dT6Owsqrd2M=",
                    SslCaLocation = "./cacert.pem"
                }
            };

            string consumerActorName = "consumerActor";
            var consumerActor = AkkaLoad.RegisterActor(consumerActorName /*AkkaLoad가 인식하는 유니크명*/,
                actorSystem.ActorOf(Props.Create(() =>
                    new ConsumerActor(consumerAkkaOption)),
                    consumerActorName /*AKKA가 인식하는 Path명*/
            ));

            //컨슈머를 작동시킨다.
            consumerActor.Tell(new ConsumerStart());

SubScibe를 하는 Consumer 기능을 위와 같은 코드로 작동가능합니다.

Consumer의 경우 메시징을 항상 청취하는 상태여야하기 때문에 , 생성후 ConsumerStart-청취시작 명령을 명시적으로 날려주면 

Consumer기능이 작동하기 시작합니다.

닷넷코어 API 에서 액터모델과 연동하여 EventHub에 카프카 모드로 전송

Auzre에서 Kafka 매트릭 모니터링을 기본으로 해줘서, 별도의 모니터링 시스템을 구축안해도되는게 장점인것같네요

메시징을 생산하여, 모니터링 시스템이 반응하여 메시징이 추가되면 연결및 구현성공입니다.

로깅을 통해 생산과 소비가 작동하는것을 함께 확인할수 있습니다.

샘플코드 : https://github.com/psmon/AkkaDotModule/blob/master/AkkaDotBootApi/Controllers/KafkaController.cs


<PackageReference Include="Confluent.Kafka" Version="1.5.2" />

Confluent.Kafka를 액터에 연결하여 전송하였으며, Actor모델을 사용안할시 위 모듈을 직접 이용하여

EventHub(Kafka)에 생산및 소비처리가 가능합니다.


Kafka를 액터에 연결한 목적

참고 : http://eyeahs.github.io/blog/2017/01/24/a-journey-into-reactive-streams/


KafkaClient를 액터화하면서, Akka의 Stream기에 연결될수 있으며 Akka의 스트림은 흐름속도제어를 포함하여 스트림에 필요한 다양한 기능을 제공합니다.

  • 기본모드 : 기본적인 KafkaClient를 사용하면 생산과 소비기능만을 사용하게되며, 소비할시 별도의 로직처리를 하지 않는다고 하면 블락킹 모드로 작동될수 있습니다.
  • 확장 : 생산과 소비의 입출력에 액터를 연결함으로 생산과 소비의 속도조절을 할수 있게되며, 작업자와 분리를 할수 있습니다.
  • 배압장치(역압력을 통한 속도제어) : 출력버퍼가 비워지지않고 느리게 처리될시, 이를 동적으로 감지하고 소비를 적정으로 맞춰야 더 빠르고 안정적으로 처리된다는 이론이며
    이것은 실제 존재하는 장치에서 아이디어를 얻어왔으며, 리액티브스트림이 준수해야할 스펙중 하나입니다.

배압조절기는 실제 유체흐름을 안정적이고 빠르게 하기위해  존재하는장치이며

출력의 공간에 유체가 가득차게되면, 압력이 증가하고 출력의 흐름을 원활하게 하기위해

출력의 압력을 감지하여, 입력부의 유량을 동적으로 안정적으로 조절하는 장치를 이야기합니다.

참고링크



  • No labels