여기서 소개하는 방식은 아직 Kafka-SSL 보안 연결사용이 불가하며, AzureKafka-SASSL 연결이 불가합니다.

프라이빗 네트워크에서 보안없는 Kafka연결시 유용하며

보안연결이 가능한 모듈은 다음을 참고합니다.

Auzere EventHub(KAFKA) With Actor

사전 요구사항:

  • Kafka 시스템을 이해하고 도커로 구동해보기 :  KAFKA 이해
  • AKKA 시스템을 심플하게 사용할수 있는 AkkaDotModule 기본 셋팅법 익히기  : AkkaDotModule
  • 아래 샘플을 통해 Kafka의 스트림을 AKKA-액터에 연결하기

Git : https://github.com/psmon/AkkaDotModule

Consumer와 Producer 구현

카프카의 Consumer와 Producer를 작동하기 위해 아래 코드면 충분합니다.

## KAfka Stream
 
액터시스템을 이용하여 Kafka를 더 심플하고 강력하게 사용가능합니다.
 
    // KAFKA 셋팅
    var consumerSystem = app.ApplicationServices.GetService<ConsumerSystem>();
    var producerSystem = app.ApplicationServices.GetService<ProducerSystem>();
 
    //소비자 : 복수개의 소비자 생성가능
    consumerSystem.Start(new ConsumerAkkaOption()
    {
        KafkaGroupId = "testGroup",
        KafkaUrl = "kafka:9092",
        RelayActor = null,          //소비되는 메시지가 지정 액터로 전달되기때문에,처리기는 액터로 구현
        Topics = "akka100"
    });
 
    //생산자 : 복수개의 생산자 생성가능
    producerSystem.Start(new ProducerAkkaOption()
    {
        KafkaUrl = "kafka:9092",
        ProducerName = "producer1"
    });
 
    List<string> messages = new List<string>();
    //보너스 : 생산의 속도를 조절할수 있습니다.( 카프카전송을 제어함으로 소비조절이 필요할때 )
    int tps = 10;
    producerSystem.SinkMessage("producer1", "akka100", messages, tps);


Kafka Client 작동 설정

기본 설정파일을 사용하면 되며, 성능및 고급 작동방법 변경이 필요할시 설정화를 통해 튜닝가능합니다.

akka.kafka.conf - Kafka 설정
akka.loglevel = ERROR

# // #producer-settings
# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
# Config path of Akka Discovery method
# "akka.discovery" to use the Akka Discovery method configured for the ActorSystem
discovery-method = akka.discovery

# Set a service name for use with Akka Discovery
# https://doc.akka.io/docs/alpakka-kafka/current/discovery.html
service-name = ""

# Timeout for getting a reply from the discovery-method lookup
resolve-timeout = 3 seconds

# Tuning parameter of how many sends that can run in parallel.
# In 2.0.0: changed the default from 100 to 10000
parallelism = 10000

# Duration to wait for `KafkaProducer.close` to finish.
close-timeout = 60s

# Call `KafkaProducer.close` when the stream is shutdown. This is important to override to false
# when the producer instance is shared across multiple producer stages.
close-on-producer-stop = true

# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"

# The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
# for exactly-once-semantics processing.
eos-commit-interval = 100ms

# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
}
}
# // #producer-settings

# // #consumer-settings
# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.consumer {
# Config path of Akka Discovery method
# "akka.discovery" to use the Akka Discovery method configured for the ActorSystem
discovery-method = akka.discovery

# Set a service name for use with Akka Discovery
# https://doc.akka.io/docs/alpakka-kafka/current/discovery.html
service-name = ""

# Timeout for getting a reply from the discovery-method lookup
resolve-timeout = 3 seconds

# Tuning property of scheduled polls.
# Controls the interval from one scheduled poll to the next.
poll-interval = 50ms

# Tuning property of the `KafkaConsumer.poll` parameter.
# Note that non-zero value means that the thread that
# is executing the stage will be blocked. See also the `wakup-timeout` setting below.
poll-timeout = 50ms

# The stage will delay stopping the internal actor to allow processing of
# messages already in the stream (required for successful committing).
# This can be set to 0 for streams using `DrainingControl`.
stop-timeout = 30s

# Duration to wait for `KafkaConsumer.close` to finish.
close-timeout = 20s

# If offset commit requests are not completed within this timeout
# the returned Future is completed `CommitTimeoutException`.
# The `Transactional.source` waits this ammount of time for the producer to mark messages as not
# being in flight anymore as well as waiting for messages to drain, when rebalance is triggered.
commit-timeout = 15s

# If commits take longer than this time a warning is logged
commit-time-warning = 1s

# Not relevant for Kafka after version 2.1.0.
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
commit-refresh-interval = infinite

# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
use-dispatcher = "akka.kafka.default-dispatcher"

# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
# Disable auto-commit by default
enable.auto.commit = false
}

# Time to wait for pending requests when a partition is closed
wait-close-partition = 500ms

# Limits the query to Kafka for a topic's position
position-timeout = 5s

# When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
# call to Kafka's API
offset-for-times-timeout = 5s

# Timeout for akka.kafka.Metadata requests
# This value is used instead of Kafka's default from `default.api.timeout.ms`
# which is 1 minute.
metadata-request-timeout = 5s

# Interval for checking that transaction was completed before closing the consumer.
# Used in the transactional flow for exactly-once-semantics processing.
eos-draining-check-interval = 30ms

# Issue warnings when a call to a partition assignment handler method takes
# longer than this.
partition-handler-warning = 5s

# Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
# configured by `consumer.metadata-request-timeout`
connection-checker {

#Flag to turn on connection checker
enable = false

# Amount of attempts to be performed after a first connection failure occurs
# Required, non-negative integer
max-retries = 3

# Interval for the connection check. Used as the base for exponential retry.
check-interval = 15s

# Check interval multiplier for backoff interval
# Required, positive number
backoff-factor = 2.0
}

}
# // #consumer-settings

# // #committer-settings
# Properties for akka.kafka.CommitterSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.committer {

# Maximum number of messages in a single commit batch
max-batch = 1000

# Maximum interval between commits
max-interval = 10s

# Parallelsim for async committing
parallelism = 100

# API may change.
# Delivery of commits to the internal actor
# WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
# SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
delivery = WaitForAck
}
# // #committer-settings

# The dispatcher that will be used by default by consumer and
# producer stages.
akka.kafka.default-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"

thread-pool-executor {
fixed-pool-size = 16
}
}


KAFKA Docker Compose

로컬 개발환경 카프카는 DockerCompose 를 통해 쉽게 구동가능합니다.

version: '3.5'
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
    - '2181:2181'
    environment:
    - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    hostname: kafka
    image: 'bitnami/kafka:latest'
    ports:
    - '9092:9092'
    environment:
    - KAFKA_ADVERTISED_HOST_NAME=kafka
    - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
    - ALLOW_PLAINTEXT_LISTENER=yes
 

개발환경을 위한 kafka 클러스터

host 수정

kafka 127.0.0.1



참고링크 : Apakka라는 Kafka Stream을 Akka에 활용하는 모듈이 사용되었습니다. 


  • No labels