Azure 의 EventHub를 셋팅하고  .Net Core Akka.net 액터모델을 만들고 기능을 연결해보겠습니다.

EventHub를 사용하는 전반적인 느낌은, KAFKA의 기능을 흡수한 KAFKA-SASS 였으며 

KAFKA의 셋팅과 토픽관리를 Azure 포털로 간편하게 할수 있었습니다.

사용프레임워크

  • net core 3.1
  • akka.net
  • Confluent.Kafka


자바의 경우 다음 링크 AkkaStream With EventHub 에서 AKKAStream을 사용하여  연결가능하며


Akka.net에는 아직 EventHub(Kafka 모드)와 연결지원하는 모드를 찾지못하여

액터모델에 Kafka클라이언트를 탑재하여, 메시징 처리에유연한 액터모델버전으로 확장하였습니다.


Event Hub소개

Event Hubs는 구성 또는 관리 오버헤드가 거의 없는 완전 관리형 PaaS(Platform-as-a-Service)이므로 비즈니스 솔루션에 집중할 수 있습니다. Apache Kafka 에코시스템을 위한 Event Hubs는 클러스터를 관리, 구성 또는 실행할 필요가 없는 PaaS Kafka 환경을 제공합니다.

주요 아키텍처


개념적 맵핑

항목(Topic) 만들기

여기서 이벤트허브의 name은 , kafka의 topic에 해당하며, "akka100" 이라고 생성해두겠습니다. ( 아래 테스트 샘플에 사용되는 토픽명)

가격

카프카인터페이스를 활용하려면 기본이아닌 표준플랜을 사용해야 

Kafka 인터페이스가 활성화됩니다. ( 몇번 삽질끝에 알아냄, Kafka활용하려면 소비자 그룹 1은 부족하며 메시지당 이용가격은 동일하나 고정 인스턴스 비용이 2배이네요)

생성

BootstrapServers = "webnori-kafka.servicebus.windows.net:9093"

여기서 지정한 이름은 Kafka의 Brokers에 해당하는 서브도메인이 됩니다.

Port는 9093으로 SSL만 지원합니다. ( KAFKA기준 정확한 보안 프로토콜명은 : SaslSsl , Kafka의 기능을 사용하여 핸드쉐이크가 이루어짐)

모니터링

생성후 KAFKASUFACE가 활성화되는지 체크합니다.

( 저의경우 기본플랜으로 선택하였다가 리전변경및 여러번 실패후, 상위 플랜으로 재생성하여 활성화가 되었습니다.  -모든 리전지원합니다.   )


모든리전 지원하며, 기본플랜이아닌 표준플랜이상 KAFKA 인터페이스가 지원합니다.

양끝단을 Kafka 인터페이스로 맞추고, 클라우드로 관리되는 큐로 보여집니다.


Kafka의 Suface를 지원함으로, Kafka와 동일한 수준에서 다양한 클라이언트 연동이 가능합니다.

참고 : https://docs.microsoft.com/ko-kr/azure/event-hubs/apache-kafka-developer-guide


여기서는 닷넷용 KafkaClient를 활용하여, Akka.net 액터모델에 탑재를 시도해보겠습니다.

( Akka.net Stream에서 Kafka SaSSL연결이 지원되면 이 모듈사용을 권장합니다. - 작성기준 지원되지 않아 액터모델로 변종하였습니다.)

액터모델을 통해 Kafka 메시지 생성 - 닷넷코어


셋팅이 끝났으면, 닷넷 코어 어플리케이션을 통해 Auzre 이벤트 허브에 메시징을 생산하는 시도를 해보겠습니다.

사전준비

AkkaDotModule은 AKKA.net을 포함 KAFKA사용을 조금더 쉽게 사용하게끔 웹노리 주인장이 변종한 모듈이며

Akka.net의 기능과 파생된 녀석들이 방대하기때문에, 필요한만큼만 안정적인 버전으로 패키징한 모듈입니다.

https://www.nuget.org/packages/AkkaDotModule.Webnori/  -작성기준 1.0.9

Nuget을 통해 위 모듈을 이용하면, 여기 설명하는 베이스로  작동 가능합니다.


SAS 정책추가

            var producerAkkaOption = new ProducerAkkaOption()
            {
                BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
                ProducerName = "webnori-kafka",
                SecuritOption = new KafkaSecurityOption()
                {
                    SecurityProtocol = SecurityProtocol.SaslSsl,
                    SaslMechanism = SaslMechanism.Plain,
                    SaslUsername = "$ConnectionString",
                    SaslPassword = "Endpoint=sb://webnori-kafka.servicebus.windows.net/;SharedAccessKeyName=kafka-client;SharedAccessKey=PfL0qRUm50AXZHRXLiVfnatIRI3OqAh+dT6Owsqrd2M=",
                    SslCaLocation = "./cacert.pem"
                }
            };

위에 생성한 정책은 , 연결문자를 SaslPassword 부분에 붙여넣기 하면되겠습니다.

SSL을 위한 Global Root 인증서 : https://github.com/psmon/AkkaDotModule/blob/master/AkkaDotBootApi/cacert.pem


접근보안추가

공개할수 있는 주요 이유는, IP Base로 허용 네트워크를 한번더 걸어 놓았습니다.

IP허용을 추가로 걸어놓았기때문에 , 접속 String및 접근 토큰을 알고 있다고 하더라도 전송하지 못합니다.

( 2중보안이 권장되며, 공격대응에 비용이증가하는지 검토못하였으니 공격은 하지말아주세요 - 듀토리얼 소임을 다했으니, 소멸예정입니다. )

오토 스케일링은 클라우드 비용이기때문에 연습과정에서는 항상 Off해둡니다.

EventHub에 Kafka모드로 메시지 생산하기

EventHub(Kafka)에 가장 심플한 방식으로 전송되게 하려고 노력하였으며

아래와같이 사용하면 되겠습니다. Kafka와 호환되며 SecurityOption이 없으면 비SSL모드로 작동합니다.

            var producerAkkaOption = new ProducerAkkaOption()
            {
                BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
                ProducerName = "webnori-kafka",
                SecuritOption = new KafkaSecurityOption()
                {
                    SecurityProtocol = SecurityProtocol.SaslSsl,
                    SaslMechanism = SaslMechanism.Plain,
                    SaslUsername = "$ConnectionString",
                    SaslPassword = "Endpoint=sb://webnori-kafka.servicebus.windows.net/;SharedAccessKeyName=kafka-client;SharedAccessKey=PfL0qRUm50AXZHRXLiVfnatIRI3OqAh+dT6Owsqrd2M=",
                    SslCaLocation = "./cacert.pem"
                }
            };

            string producerActorName = "producerActor";

            var producerActor= AkkaLoad.RegisterActor(producerActorName /*AkkaLoad가 인식하는 유니크명*/,
                actorSystem.ActorOf(Props.Create(() => 
                    new ProducerActor(producerAkkaOption)),
                    producerActorName /*AKKA가 인식하는 Path명*/
            ));

            producerActor.Tell(new BatchData()
            {
                Data = new KafkaTextMessage()
                {
                    Topic = "akka100",
                    Message = "testData"
                }
            });

샘플 코드 : 액터모델로 작성될시 장점은, AkkaStream과 연동하여 속도조절기,버퍼방지등 여러가지 유용한 장치를 끝단에 연결할수 있기때문입니다.


Consumer 구현

            //ConsumerActor
            var consumerAkkaOption = new ConsumerAkkaOption()
            {
                BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
                Topics = "akka100",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                KafkaGroupId = "akakTestGroup",
                RelayActor = null,  //작업자 액터를 연결하면, 소비메시지가 작업자에게 전달된다 ( 컨슘기능과 작업자 기능의 분리)
                SecurityOption = new KafkaSecurityOption()
                {
                    SecurityProtocol = SecurityProtocol.SaslSsl,
                    SaslMechanism = SaslMechanism.Plain,
                    SaslUsername = "$ConnectionString",
                    SaslPassword = "Endpoint=sb://webnori-kafka.servicebus.windows.net/;SharedAccessKeyName=kafka-client;SharedAccessKey=PfL0qRUm50AXZHRXLiVfnatIRI3OqAh+dT6Owsqrd2M=",
                    SslCaLocation = "./cacert.pem"
                }
            };

            string consumerActorName = "consumerActor";
            var consumerActor = AkkaLoad.RegisterActor(consumerActorName /*AkkaLoad가 인식하는 유니크명*/,
                actorSystem.ActorOf(Props.Create(() =>
                    new ConsumerActor(consumerAkkaOption)),
                    consumerActorName /*AKKA가 인식하는 Path명*/
            ));

            //컨슈머를 작동시킨다.
            consumerActor.Tell(new ConsumerStart());

SubScibe를 하는 Consumer 기능을 위와 같은 코드로 작동가능합니다.

Consumer의 경우 메시징을 항상 청취하는 상태여야하기 때문에 , 생성후 ConsumerStart-청취시작 명령을 명시적으로 날려주면 

Consumer기능이 작동하기 시작합니다.

닷넷코어 API 에서 액터모델과 연동하여 EventHub에 카프카 모드로 전송

Auzre에서 Kafka 매트릭 모니터링을 기본으로 해줘서, 별도의 모니터링 시스템을 구축안해도되는게 장점인것같네요

메시징을 생산하여, 모니터링 시스템이 반응하여 메시징이 추가되면 연결및 구현성공입니다.

로깅을 통해 생산과 소비가 작동하는것을 함께 확인할수 있습니다.

샘플코드 : https://github.com/psmon/AkkaDotModule/blob/master/AkkaDotBootApi/Controllers/KafkaController.cs


<PackageReference Include="Confluent.Kafka" Version="1.5.2" />

Confluent.Kafka를 액터에 연결하여 전송하였으며, Actor모델을 사용안할시 위 모듈을 직접 이용하여

EventHub(Kafka)에 생산및 소비처리가 가능합니다.


Kafka를 액터에 연결한 목적

참고 : http://eyeahs.github.io/blog/2017/01/24/a-journey-into-reactive-streams/


KafkaClient를 액터화하면서, Akka의 Stream기에 연결될수 있으며 Akka의 스트림은 흐름속도제어를 포함하여 스트림에 필요한 다양한 기능을 제공합니다.

배압조절기는 실제 유체흐름을 안정적이고 빠르게 하기위해  존재하는장치이며

출력의 공간에 유체가 가득차게되면, 압력이 증가하고 출력의 흐름을 원활하게 하기위해

출력의 압력을 감지하여, 입력부의 유량을 동적으로 안정적으로 조절하는 장치를 이야기합니다.

참고링크