Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JVM(자바+코틀린) 영역이 지원하는 동시성 프로그래밍 방식을 먼저 알아보고 액터모델에 이벤트를 전송해 결과값을 받는 ASK패턴도 알아보겠습니다.

🧩 1. 전체 테스트 구조

이 파일은 코루틴, 리액터(Reactor), CompletableFuture, Java Stream, Akka Streams, 액터(Actor) 등을 사용해서
"비동기 동시성" 그리고 "액터(Actor) 기반 메시지 통신" 테스트를 다양한 방법으로 검증하는 것입니다.

...

구분내용
기초 비동기 처리Coroutine, Mono, CompletableFuture, Java Stream으로 "Hello World"를 만드는 간단한 동시성 테스트
병렬 실행 테스트3개의 작업(문자열 생성+지연 메소드 호출 2개)을 병렬로 수행하여 하나의 문자열로 합치는 테스트
Akka Stream 테스트Akka Streams를 사용하여 비동기 스트림 병합 테스트
Actor Ask 패턴 테스트HelloActor를 대상으로 ask (질문→응답받기) 패턴 테스트
Kotlin 기반 커스텀 KHelloActor 테스트별도로 작성한 코루틴 기반 액터 (KHelloActor) 테스트

...

🧩 2. 주요 테스트 코드 설명

(1) 기본 비동기 처리 (Hello → Hello World)


방법요약
Coroutineasync로 비동기 작업하고 await()로 결과 가져옴
Mono리액터 기반 Mono 스트림 처리
CompletableFuture자바 기본 비동기 API 활용
Java Stream스트림 map 연산으로 변환 처리 후 CompletableFuture

...

Code Block
themeEmacs
titletestHelloByJavaStream
   @Test
    fun testHelloByJavaStream() {
        // Test using CompletableFuture with Java Streams to concatenate strings asynchronously
        val future = CompletableFuture.supplyAsync {
            listOf("Hello")
                .stream()
                .map { it + " World" }
                .findFirst()
                .orElse("")
        }
        val result = future.get()
        assertEquals("Hello World", result)
    }


...

(2) 동시성 처리 (Concurrency)

getDelayString()getDelayString2()는 각각 600ms, 500ms 지연 후 "SlowWorld1", "SlowWorld2"를 반환합니다.

...

Code Block
themeEmacs
titletestConcurrentByAkkaStream
    @Test
    fun testConcurrentByAkkaStream() {
        val materializer = Materializer.createMaterializer(actorSystem.system())

        val source = Source.single("Hello")
            .map { it + " World" }
            .zipWith(Source.single(getDelayString())) { part1, part2 -> Pair(part1, part2) }
            .zipWith(Source.single(getDelayString2())) { pair, part3 ->
                "${pair.first} ${pair.second} $part3"
            }
            .runWith(Sink.head(), materializer)

        val result = source.toCompletableFuture().get()

        assertEquals("Hello World SlowWorld1 SlowWorld2", result)
    }



...

(3) 액터(Actor) 기반 메시지 송수신 테스트

HelloActor는 다음 특징을 갖습니다​HelloActor:

...

Pekko의 AskPattern.ask 메소드를 활용하고, Duration 타임아웃 설정도 적용함


testActorAskByCoroutine

Code Block
themeEmacs
titletestActorAskByCoroutine
    @Test
    fun testActorAskByCoroutine() = runBlocking {
        // Test using Actor Ask pattern with coroutines
        val actorRef = actorSystem.spawn(HelloActor.create())
        var response = AskPattern.ask(
            actorRef,
            { replyTo: ActorRef<HelloCommand> -> Hello("Hello", replyTo) },
            Duration.ofSeconds(3),
            actorSystem.scheduler()
        ).toCompletableFuture().await()

        assertEquals(HelloResponse("Kotlin"), response)
    }


testActorAskByWebflux

Code Block
themeEmacs
titletestActorAskByWebflux
    @Test
    fun testActorAskByWebflux() {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val result = Mono.just("Hello")
            .flatMap { input ->
                Mono.fromFuture(
                    AskPattern.ask(
                        actorRef,
                        { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                        Duration.ofSeconds(3),
                        actorSystem.scheduler()
                    ).toCompletableFuture()
                )
            }
            .map { response -> (response as HelloResponse).message }
            .block()

        assertEquals("Kotlin", result)
    }



testActorAskByCompletableFuture

Code Block
themeEmacs
titletestActorAskByCompletableFuture
    @Test
    fun testActorAskByCompletableFuture() {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val future = CompletableFuture.supplyAsync { "Hello" }
            .thenCompose { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture()
            }
            .thenApply { response -> (response as HelloResponse).message }

        val result = future.get()
        assertEquals("Kotlin", result)
    }


testActorAskByJavaStream

Code Block
themeEmacs
titletestActorAskByJavaStream
    @Test
    fun testActorAskByJavaStream() {
        val actorRef = actorSystem.spawn(HelloActor.create())

        val result = listOf("Hello")
            .stream()
            .map { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture().join()
            }
            .map { response -> (response as HelloResponse).message }
            .findFirst()
            .orElse("")

        assertEquals("Kotlin", result)
    }


testActorAskByAkkaStream

Code Block
themeEmacs
titletestActorAskByAkkaStream
     @Test
    fun testActorAskByAkkaStream() {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val materializer = Materializer.createMaterializer(actorSystem.system())

        val source = Source.single("Hello")
            .mapAsync(1) { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture()
            }
            .map { response -> (response as HelloResponse).message }
            .runWith(Sink.head(), materializer)

        val result = source.toCompletableFuture().get()
        assertEquals("Kotlin", result)
    } 


testActorAskByAkkaStreamWithCoroutine

Code Block
themeEmacs
titletestActorAskByAkkaStreamWithCoroutine
    @Test
    fun testActorAskByAkkaStreamWithCoroutine() = runBlocking {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val materializer = Materializer.createMaterializer(actorSystem.system())

        val source = Source.single("Hello")
            .mapAsync(1) { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture()
            }
            .map { response -> (response as HelloResponse).message }
            .runWith(Sink.head(), materializer)

        val result = source.await()
        assertEquals("Kotlin", result)
    }


...

(4) 코루틴 액터 (KHelloActor) 테스트

추가로 만든 **KHelloActor**는 별도 코루틴 기반 액터입니다.

...

  1. KHelloActor를 생성하고 start() (코루틴 Job 시작)

  2. Channel을 통해 KHello 메시지를 보냄

  3. 응답으로 KHelloResponse를 수신

  4. 결과를 검증한 후 리소스 정리 (closecancel)


testKHelloActor

Code Block
themeEmacs
titletestKHelloActor
    @Test
    fun testKHelloActor() = runBlocking {
        // KHelloActor 생성 및 시작
        val actor = KHelloActor()
        val replyChannel = Channel<KHelloResponse>()

        val job = actor.start() // start 메서드가 Job을 반환하도록 수정

        try {
            // 메시지 전송 및 응답 수신
            withTimeout(3000) { // 타임아웃 설정
                actor.send(KHello("Hello", replyChannel))
                val response = replyChannel.receive()

                // 응답 검증
                assertEquals("Kotlin", response.message)
            }
        } finally {
            // 리소스 정리
            replyChannel.close()
            actor.close()
            job.cancel() // 액터의 코루틴 종료
        }
    }



...

🧩 3. 관련 클래스/구성요소 요약


클래스설명
HelloActorPekko 기반의 기본 액터 (HelloCommand 수신, Kotlin 응답)
BulkProcessor(참조만) 대량 이벤트를 버퍼링하여 플러시하는 액터​BulkProcessor
HelloRouter라우팅 풀(RoundRobin)로 여러 액터에 분산 처리하는 액터​HelloRouter
KHelloActor코루틴 기반 간단한 액터 (Virtual Thread 스타일 대응 실험)

각 클래스에 대해서 별도 HelloActorTest.kt, BulkProcessorTest.kt, HelloRouterTest.kt 테스트 파일로도 검증을 하고 있었습니다​HelloActorTestBulkProcessorTestHelloRouterTest.

...

🧩 4. 요약

✅ 이 테스트 파일은
"비동기/동시성" 기술과 "액터모델" 메시지 송수신을 다양한 방법으로 실습하고 검증하는 포괄적 예제입니다.

...

여기서 소개하는 전체코드는 다음경로에서 확인할수 있으며 유닛테스트를 통해 실행해볼수 있습니다.

전체코드

...