JAVA에서 KAFKA활용을 하기 위한 툴중하나인~ Alpakka Kafka를 활용하는  UnitTest기를 작성해보았습니다.


로컬환경 Kafka준비


version: '3.5'

services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:5.5.1
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000


  kafka-1:
    image: confluentinc/cp-kafka:5.5.1
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3


  kafka-2:
    image: confluentinc/cp-kafka:5.5.1
    ports:
      - '9093:9093'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3


  kafka-3:
    image: confluentinc/cp-kafka:5.5.1
    ports:
      - '9094:9094'
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_NUM_PARTITIONS: 3

  kafka-ui:
    image: provectuslabs/kafka-ui
    container_name: kafka-ui
    ports:
      - "8989:8080"
    restart: always
    environment:
      - KAFKA_CLUSTERS_0_NAME=local
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-1:29092,kafka-2:29093,kafka-3:29094
      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-1:22181


kafka ui툴을 내장하였으며 

docker-compose up -d 명령을 통해 kafka 클러스터 실행이 가능합니다. 

Kafka 메시지 100개 생성하기



Junit Base + Akka UnitTest 에서 작성이되며 , 블락킹 하지않고 작동이되기때문에 복잡한 메시지 테스트도 수초이내에 완료됩니다.


    @Test
    @DisplayName("KafkaProduce - 100개의 메시지생산이 처리완료")
    public void TestIt() {
        new TestKit(system) {
            {
                final TestKit probe = new TestKit(system);
                final ActorRef greetActor = AkkaManager.getInstance().getGreetActor();

                greetActor.tell(probe.getRef(), getRef());
                expectMsg(Duration.ofSeconds(1), "done");

                final Config config = system.settings().config().getConfig("akka.kafka.producer");
                final ProducerSettings<String, String> producerSettings =
                        ProducerSettings.create(config, new StringSerializer(), new StringSerializer())
                                .withBootstrapServers("localhost:9092");

                String topic = "test-1";

                CompletionStage<Done> done =
                        Source.range(1, 100)
                                .map(number -> number.toString())
                                .map(value -> new ProducerRecord<String, String>(topic, value))
                                .runWith(Producer.plainSink(producerSettings), system);

                Source<Done, NotUsed> source = Source.completionStage(done);

                within(
                        Duration.ofSeconds(10),
                        () -> {

                            Sink<Done, CompletionStage<Done>> sink = Sink.foreach(i ->
                                    greetActor.tell("hello", getRef())
                            );
                            source.runWith(sink, system); // 10

                            // check that the probe we injected earlier got the msg
                            probe.expectMsg(Duration.ofSeconds(5), "world");

                            // Will wait for the rest of the 3 seconds
                            expectNoMessage();
                            return null;
                        });
            }
        };
    }


생산된 Kafka Message 확인

생성한 메시지를 Kafka툴을 통해 확인할수 있습니다. 

더 정확한 유닛테스트는 생산자와 소비자를 모두 등록하여, 생산한 메시지의 카운트를 최종 소비자를 통해 검증하는것입니다.


Alpakka이용시 Kafka의 기능을 이용하여 AtLeastOnce 전략을 사용할수 있습니다.


생산자와 소비자를 함께 테스트하기

    void debugKafkaMsg(String key, String value, ActorRef greet, String testKey) {
        System.out.printf("pringKafka with Key-Value : %s-%s%n", key, value);

        //테스트키 동일한것만 카운트 확인..(테스트마다 Kafka고유키 사용)
        if(testKey.equals(key)) greet.tell("hello", null);

    }

    @Test
    @DisplayName("KafkaProduce - 100개의 메시지생산을하고 100개의 메시지소비테스트 확인")
    public void TestKafkaProduceAndConsume() {
        new TestKit(system) {
            {
                final TestKit probe = new TestKit(system);
                final ActorRef greetActor = AkkaManager.getInstance().getGreetActor();
                final int testCount = 100;
                final String testKey = java.util.UUID.randomUUID().toString();
                final String testKafkaServer = "localhost:9092";
                final String testGroup = "group1";

                greetActor.tell(probe.getRef(), getRef());
                expectMsg(Duration.ofSeconds(1), "done");

                final Config producerConfig = system.settings().config().getConfig("akka.kafka.producer");
                final ProducerSettings<String, String> producerSettings =
                        ProducerSettings.create(producerConfig, new StringSerializer(), new StringSerializer())
                                .withBootstrapServers(testKafkaServer);


                final Config conSumeConfig = system.settings().config().getConfig("akka.kafka.consumer");
                final ConsumerSettings<String, String> consumerSettings =
                        ConsumerSettings.create(conSumeConfig, new StringDeserializer(), new StringDeserializer())
                                .withBootstrapServers(testKafkaServer)
                                .withGroupId(testGroup)
                                .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
                                .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000")
                                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


                String topic = "test-1";

                //Consumer Setup
                Consumer
                        .plainSource(
                                consumerSettings,
                                Subscriptions.topics(topic))
                        .to(Sink.foreach(msg ->
                                debugKafkaMsg(msg.key(), msg.value(), greetActor, testKey))
                        )
                        .run(system);

                //Producer Setup
                CompletionStage<Done> done =
                        Source.range(1, testCount)
                                .map(number -> number.toString())
                                .map(value -> new ProducerRecord<String, String>(topic, testKey, value))
                                .runWith(Producer.plainSink(producerSettings), system);

                //Producer Task Setup
                Source<Done, NotUsed> source = Source.completionStage(done);

                within(
                        Duration.ofSeconds(10),
                        () -> {

                            Sink<Done, CompletionStage<Done>> sink = Sink.foreach(i ->
                                    System.out.println("생산완료")
                            );

                            //For Clean Test - 3초후 메시지생성
                            expectNoMessage(Duration.ofSeconds(3));

                            // Kafka 생산시작
                            source.runWith(sink, system);

                            // Kafka 소비 메시지 확인(100)
                            for (int i = 0; i < testCount; i++) {
                                probe.expectMsg(Duration.ofSeconds(5), "world");
                            }

                            return null;
                        });
            }
        };
    }







단일토픽 멀티 파티션

이 테스트기의 목적은 AkkaStream을 탑재하여 작동코드를 테스트 해볼수 있지만, Kafka를 이용하기 위해 꼭 이 스트림기를 이용할 필요는 없습니다.

Kafka를 활용하는 방법은 다양할수 있으며 Kafka의 고급옵션들이 실제 의도된대로 작동하는지 Kafka 작동을 확인할수 있는 학습용도로만 이용할수도 있습니다.


                            // Kafka 생산시작
                            source.runWith(sink, system);   //파티션 0
                            source2.runWith(sink, system);  //파티션 1

                            // Kafka 소비 메시지 확인 -
                            for (int i = 0; i < testCount * partitionCount; i++) {
                                probe.expectMsg(Duration.ofSeconds(5), "world");
                            }


단일 토픽에 파티션을 N개 소비테스트가 필요할때 파티션 지정을 하여 N개 구동할수 있습니다. 

N개의 소비자를 작동 할수 있으며 총 생산 메시지를 검증할수 있습니다.

지원되는 Consumer Type : https://doc.akka.io/docs/alpakka-kafka/current/consumer.html


카프카 옵션

# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
  # Config path of Akka Discovery method
  # "akka.discovery" to use the Akka Discovery method configured for the ActorSystem
  discovery-method = akka.discovery

  # Set a service name for use with Akka Discovery
  # https://doc.akka.io/docs/alpakka-kafka/current/discovery.html
  service-name = ""

  # Timeout for getting a reply from the discovery-method lookup
  resolve-timeout = 3 seconds

  # Tuning parameter of how many sends that can run in parallel.
  # In 2.0.0: changed the default from 100 to 10000
  parallelism = 10000

  # Duration to wait for `KafkaProducer.close` to finish.
  close-timeout = 60s

  # Call `KafkaProducer.close` when the stream is shutdown. This is important to override to false
  # when the producer instance is shared across multiple producer stages.
  close-on-producer-stop = true

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the producer stages. Some blocking may occur.
  # When this value is empty, the dispatcher configured for the stream
  # will be used.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
  # for exactly-once-semantics processing.
  eos-commit-interval = 100ms

  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
  # can be defined in this configuration section.
  kafka-clients {
  }
}

akka.kafka.consumer {

  enable.auto.commit = true

  kafka-clients {
    bootstrap.servers = "localhost:9092"
  }
}

akka.kafka.committer {

  # Maximum number of messages in a single commit batch
  max-batch = 1000

  # Maximum interval between commits
  max-interval = 3s

  # Parallelsim for async committing
  parallelism = 100

  # API may change.
  # Delivery of commits to the internal actor
  # WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
  # SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
  delivery = WaitForAck

  # API may change.
  # Controls when a `Committable` message is queued to be committed.
  # OffsetFirstObserved: When the offset of a message has been successfully produced.
  # NextOffsetObserved: When the next offset is observed.
  when = OffsetFirstObserved
}
                final Config conSumeConfig = system.settings().config().getConfig("akka.kafka.consumer");
                final ConsumerSettings<String, String> consumerSettings =
                        ConsumerSettings.create(conSumeConfig, new StringDeserializer(), new StringDeserializer())
                                .withBootstrapServers(testKafkaServer)
                                .withGroupId(testGroup)
                                .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
                                .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000")
                                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

카프카의 옵션을 기본값으로 정의할수 있으며 코드를 통해 특정(생산/소비/커밋방식) 작동만 변경할수 있습니다.



보너스편 - ChatGPT에게 작성한 유닛테스트 코드를 분석의뢰

요즘 ChatGPT가 설명을 더 잘한는것 같아 보너스로 분석 의뢰를 해보았습니다.


Q> 위 코드를 분석해죠

ChatGPT> 이 코드는 Akka-Streams와 Akka-Kafka를 사용하여 Kafka 프로듀서 및 컨슈머를 테스트하는 것입니다.


첫 번째 테스트인 "TestKafkaProduce"에서는 Kafka 프로듀서를 생성하고 1에서 100까지의 메시지를 생성하여

Kafka에 기록하고 작업이 완료되면 Kafka 컨슈머를 시작하여 기록한 메시지를 확인합니다. 메시지가 성공적으로 처리되면 "hello"라는 메시지를 Akka 액터에게 보냅니다.


두 번째 테스트 "TestKafkaProduceAndConsume"에서는 Kafka 프로듀서를 생성하고 1에서 100까지의 메시지를 생성하여 Kafka에 기록합니다

. 그런 다음 Kafka 컨슈머를 시작하고 Kafka에서 생성한 메시지를 확인하여 모든 메시지가 성공적으로 처리되었는지 확인합니다. 이 과정에서 "hello"라는 메시지를 Akka 액터에게 보내어 확인합니다.


Kafka 개발을 위한 선택


Akka가 JAVA호환이기도 하지만 Scala 기반으로 작성되었기때문에 JAVA와 SCALA의 언어스펙 차이 따라 동일기능을 수행하더라도 JAVA코드로 작성된 코드가 가독성이 떨어지고 복잡합니다.

오히려 자바보다 닷넷으로 포팅된 AKKA가 SCALA작성법과 유사하며

참고 : https://getakka.net/community/contributing/porting-guide.html


AKKA의 Reactive지원이 강력하긴하지만 JAVA SPRING에서의 KAFKA 활용 구현체의 난이도를 낮추는 것은 Webflux를 대안으로 추천합니다.


참고링크 :


상위 페이지 : alpakka




  • No labels
Write a comment…