Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


Consumer 구현

Code Block
themeEmacs
            //ConsumerActor
            var consumerAkkaOption = new ConsumerAkkaOption()
            {
                BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
                Topics = "akka100",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                KafkaGroupId = "akakTestGroup",
                RelayActor = null,  //작업자 액터를 연결하면, 소비메시지가 작업자에게 전달된다 ( 컨슘기능과 작업자 기능의 분리)
                SecurityOption = new KafkaSecurityOption()
                {
                    SecurityProtocol = SecurityProtocol.SaslSsl,
                    SaslMechanism = SaslMechanism.Plain,
                    SaslUsername = "$ConnectionString",
                    SaslPassword = "Endpoint=sb://webnori-kafka.servicebus.windows.net/;SharedAccessKeyName=kafka-client;SharedAccessKey=PfL0qRUm50AXZHRXLiVfnatIRI3OqAh+dT6Owsqrd2M=",
                    SslCaLocation = "./cacert.pem"
                }
            };

            string consumerActorName = "consumerActor";
            var consumerActor = AkkaLoad.RegisterActor(consumerActorName /*AkkaLoad가 인식하는 유니크명*/,
                actorSystem.ActorOf(Props.Create(() =>
                    new ConsumerActor(consumerAkkaOption)),
                    consumerActorName /*AKKA가 인식하는 Path명*/
            ));

            //컨슈머를 작동시킨다.
            consumerActor.Tell(new ConsumerStart());

SubScibe를 하는 Consumer 기능을 위와 같은 코드로 작동가능합니다.

Consumer의 경우 메시징을 항상 청취하는 상태여야하기 때문에 , 생성후 ConsumerStart-청취시작 명령을 명시적으로 날려주면 

Consumer기능이 작동하기 시작합니다.

닷넷코어 API 에서 액터모델과 연동하여 EventHub에 카프카 모드로 전송

...

메시징을 생산하여, 모니터링 시스템이 반응하여 메시징이 추가되면 연결및 구현성공입니다.

로깅을 통해 생산과 소비가 작동하는것을 함께 확인할수 있습니다.

샘플코드 : https://github.com/psmon/AkkaDotModule/blob/master/AkkaDotBootApi/Controllers/KafkaController.cs

...

EventHub(Kafka)에 생산및 소비처리가 가능합니다.

...

titlenext update 예정

...

.



참고링크

...