시뮬레이션 : https://famous-caramel-e50cb4.netlify.app/ 

  • 요즘 유행하는 코드생성기 Bolt로 이 개념을 설명하기위해 작성해보았습니다.
    • 한명이 모든일을 하는 바리스타와 주문하는사람은 따로있고 바리스타가 분리되어 운영하는 커피숍


동시성처리와 병렬성처리는 유사해보이지만 큰 차이가있으며 다음과같이 간략하게 설명될수 있습니다.

  • 동시처리 : 한사람이 커피주문 받고 만들기를 동시에한다. 커피머신을 이용하면서도 주문을 받음 - 비동기 논블록킹 프로그래밍
    • 주의 : 블락킹이 하나라도 발생하면 전체가 함께 지연됨
  • 병렬처리 : 커피주문을 받는사람과 만드는 사람이 각각 따로 있다.  - 병렬처리 멀티스레드 프로그래밍
    • 주의 : 작업자 하나에 블락킹이 발생해도 다른작업가 작업을 진행하는것에 영향을 주지않지만 작업처리간 자원 공유문제를 잘 해결하지 못하면 데드락및 궁핍현상등이 발생해 전체작업이 중단될수도 있습니다.
  • 순차처리(동기순차 블락킹 프로그래밍) : 한사람이 커피주문을 받고 커피를 만들지만~ 커피가 완성될때까지 다음손님 주문을 받지 못한다.
    • 동시/병렬로 진행되지 않음으로 전체 소요시간은 각 Task의 전체합으로 전체완료시간이 가장느리며
    • 병렬프로세싱은 가장빠른 전체완료시간을 보장할수 있지만, 동시성처리는 더 적은 자원으로 병렬프로세싱과 유사한 효율성을 가질수 있습니다. 


JVM(자바+코틀린) 영역이 지원하는 동시성 프로그래밍 방식을 먼저 알아보고 액터모델에 이벤트를 전송해 결과값을 받는 ASK패턴도 알아보겠습니다.

🧩 1. 전체 테스트 구조

이 파일은 코루틴, 리액터(Reactor), CompletableFuture, Java Stream, Akka Streams, 액터(Actor) 등을 사용해서
"비동기 동시성" 그리고 "액터(Actor) 기반 메시지 통신" 테스트를 다양한 방법으로 검증하는 것입니다.

구체적으로는:


구분내용
기초 비동기 처리Coroutine, Mono, CompletableFuture, Java Stream으로 "Hello World"를 만드는 간단한 동시성 테스트
병렬 실행 테스트3개의 작업(문자열 생성+지연 메소드 호출 2개)을 병렬로 수행하여 하나의 문자열로 합치는 테스트
Akka Stream 테스트Akka Streams를 사용하여 비동기 스트림 병합 테스트
Actor Ask 패턴 테스트HelloActor를 대상으로 ask (질문→응답받기) 패턴 테스트
Kotlin 기반 커스텀 KHelloActor 테스트별도로 작성한 코루틴 기반 액터 (KHelloActor) 테스트

🧩 2. 주요 테스트 코드 설명

(1) 기본 비동기 처리 (Hello → Hello World)


방법요약
Coroutineasync로 비동기 작업하고 await()로 결과 가져옴
Mono리액터 기반 Mono 스트림 처리
CompletableFuture자바 기본 비동기 API 활용
Java Stream스트림 map 연산으로 변환 처리 후 CompletableFuture

공통 목표: "Hello World" 문자열 생성 및 검증


testHelloByCoroutine
    @Test
    fun testHelloByCoroutine() = runBlocking {
        // Test using CompletableFuture to concatenate strings asynchronously
        val input = "Hello"
        val result = withContext(Dispatchers.Default) {
            val part1 = async { input + " World" }
            part1.await()
        }
        assertEquals("Hello World", result)
    }
testHelloByMono
    @Test
    fun testHelloByMono() {
        // Test using Reactor Mono to concatenate strings asynchronously
        val input = "Hello"
        val monoResult = Mono.just(input)
            .map { it + " World" }
            .block()

        assertEquals("Hello World", monoResult)
    }
testHelloByCompletableFuture
    @Test
    fun testHelloByCompletableFuture() {
        // Test using CompletableFuture to concatenate strings asynchronously
        val future = CompletableFuture.supplyAsync { "Hello" }
            .thenApplyAsync { result -> result + " World" }

        val result = future.get()

        assertEquals("Hello World", result)
    }
testHelloByJavaStream
   @Test
    fun testHelloByJavaStream() {
        // Test using CompletableFuture with Java Streams to concatenate strings asynchronously
        val future = CompletableFuture.supplyAsync {
            listOf("Hello")
                .stream()
                .map { it + " World" }
                .findFirst()
                .orElse("")
        }
        val result = future.get()
        assertEquals("Hello World", result)
    }



(2) 동시성 처리 (Concurrency)

getDelayString()getDelayString2()는 각각 600ms, 500ms 지연 후 "SlowWorld1", "SlowWorld2"를 반환합니다.

이걸 동시처리 병렬로 돌려서:

Hello World SlowWorld1 SlowWorld2

라는 최종 문자열을 만들고 검증합니다.

방법별 특징:

  • Coroutine
    async를 3개 띄워 await로 병합

  • Mono
    zipWith으로 여러 Mono를 병렬 조합

  • Flux
    flatMap으로 병렬 작업 후 결과 모음

  • CompletableFuture
    thenCombineAsync로 결과 합성

  • Java Stream
    Stream 내부에서 Future join()해서 결과 병합

  • Akka Stream
    Source를 연결(zipWith)해서 결과를 스트림 합성


testConcurrentByCoroutine
    @Test
    fun testConcurrentByCoroutine() = runBlocking {
        val input = "Hello"
        val result = withContext(Dispatchers.Default) {
            val part1 = async { input + " World" }
            val part2 = async { getDelayString() }
            val part3 = async { getDelayString2() }
            "${part1.await()} ${part2.await()} ${part3.await()}"
        }

        assertEquals("Hello World SlowWorld1 SlowWorld2", result)
    }
testConcurrentByMono
    @Test
    fun testConcurrentByMono() {
        val input = "Hello"
        val monoResult = Mono.just(input)
            .map { it + " World" }
            .zipWith(Mono.fromCallable { getDelayString() })
            .zipWith(Mono.fromCallable { getDelayString2() }) { part1, part2 ->
                "${part1.t1} ${part1.t2} $part2"
            }
            .block()

        assertEquals("Hello World SlowWorld1 SlowWorld2", monoResult)
    }
testConcurrentByFlux
    @Test
    fun testConcurrentByFlux() {
        val input = listOf("Hello", "World")
        val fluxResult = Flux.fromIterable(input)
            .flatMap { data ->
                when (data) {
                    "Hello" -> Mono.fromCallable { getDelayString() }
                    "World" -> Mono.fromCallable { getDelayString2() }
                    else -> Mono.empty()
                }
            }
            .collectList()
            .map { results -> results.joinToString(" ") }
            .block()

        assertEquals("SlowWorld1 SlowWorld2", fluxResult)
    }
testConcurrentByCompletableFuture
    @Test
    fun testConcurrentByCompletableFuture() {
        val future = CompletableFuture.supplyAsync { "Hello" }
            .thenApplyAsync { result -> result + " World" }
            .thenCombineAsync(CompletableFuture.supplyAsync { getDelayString() }) { part1, part2 ->
                "$part1 $part2"
            }
            .thenCombineAsync(CompletableFuture.supplyAsync { getDelayString2() }) { part1, part3 ->
                "$part1 $part3"
            }

        val result = future.get()

        assertEquals("Hello World SlowWorld1 SlowWorld2", result)
    }
testConcurrentByJavsStream
    @Test
    fun testConcurrentByJavsStream() {
        val result = listOf("Hello")
            .stream()
            .map { input -> input + " World" }
            .map { part1 ->
                val part2 = CompletableFuture.supplyAsync { getDelayString() }.join()
                val part3 = CompletableFuture.supplyAsync { getDelayString2() }.join()
                "$part1 $part2 $part3"
            }
            .findFirst()
            .orElse("")

        assertEquals("Hello World SlowWorld1 SlowWorld2", result)
    }
testConcurrentByAkkaStream
    @Test
    fun testConcurrentByAkkaStream() {
        val materializer = Materializer.createMaterializer(actorSystem.system())

        val source = Source.single("Hello")
            .map { it + " World" }
            .zipWith(Source.single(getDelayString())) { part1, part2 -> Pair(part1, part2) }
            .zipWith(Source.single(getDelayString2())) { pair, part3 ->
                "${pair.first} ${pair.second} $part3"
            }
            .runWith(Sink.head(), materializer)

        val result = source.toCompletableFuture().get()

        assertEquals("Hello World SlowWorld1 SlowWorld2", result)
    }




(3) 액터(Actor) 기반 메시지 송수신 테스트

HelloActor는 다음 특징을 갖습니다​HelloActor:

  • "Hello" 메시지를 받으면 "Kotlin" 응답을 보낸다.

  • HelloCommandHelloResponse를 주고받음

여기서 검증하는 것:

  • Ask Pattern을 통한 응답 받기

  • 다양한 방식 (Coroutine, WebFlux(Mono), CompletableFuture, Java Stream, Akka Stream)으로 액터에게 ask 보내고 응답받음

  • 결국 모두 "Kotlin" 이라는 응답 메시지를 받아야 성공

Pekko의 AskPattern.ask 메소드를 활용하고, Duration 타임아웃 설정도 적용함


testActorAskByCoroutine

testActorAskByCoroutine
    @Test
    fun testActorAskByCoroutine() = runBlocking {
        // Test using Actor Ask pattern with coroutines
        val actorRef = actorSystem.spawn(HelloActor.create())
        var response = AskPattern.ask(
            actorRef,
            { replyTo: ActorRef<HelloCommand> -> Hello("Hello", replyTo) },
            Duration.ofSeconds(3),
            actorSystem.scheduler()
        ).toCompletableFuture().await()

        assertEquals(HelloResponse("Kotlin"), response)
    }


testActorAskByWebflux

testActorAskByWebflux
    @Test
    fun testActorAskByWebflux() {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val result = Mono.just("Hello")
            .flatMap { input ->
                Mono.fromFuture(
                    AskPattern.ask(
                        actorRef,
                        { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                        Duration.ofSeconds(3),
                        actorSystem.scheduler()
                    ).toCompletableFuture()
                )
            }
            .map { response -> (response as HelloResponse).message }
            .block()

        assertEquals("Kotlin", result)
    }



testActorAskByCompletableFuture

testActorAskByCompletableFuture
    @Test
    fun testActorAskByCompletableFuture() {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val future = CompletableFuture.supplyAsync { "Hello" }
            .thenCompose { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture()
            }
            .thenApply { response -> (response as HelloResponse).message }

        val result = future.get()
        assertEquals("Kotlin", result)
    }


testActorAskByJavaStream

testActorAskByJavaStream
    @Test
    fun testActorAskByJavaStream() {
        val actorRef = actorSystem.spawn(HelloActor.create())

        val result = listOf("Hello")
            .stream()
            .map { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture().join()
            }
            .map { response -> (response as HelloResponse).message }
            .findFirst()
            .orElse("")

        assertEquals("Kotlin", result)
    }


testActorAskByAkkaStream

testActorAskByAkkaStream
     @Test
    fun testActorAskByAkkaStream() {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val materializer = Materializer.createMaterializer(actorSystem.system())

        val source = Source.single("Hello")
            .mapAsync(1) { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture()
            }
            .map { response -> (response as HelloResponse).message }
            .runWith(Sink.head(), materializer)

        val result = source.toCompletableFuture().get()
        assertEquals("Kotlin", result)
    } 


testActorAskByAkkaStreamWithCoroutine

testActorAskByAkkaStreamWithCoroutine
    @Test
    fun testActorAskByAkkaStreamWithCoroutine() = runBlocking {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val materializer = Materializer.createMaterializer(actorSystem.system())

        val source = Source.single("Hello")
            .mapAsync(1) { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture()
            }
            .map { response -> (response as HelloResponse).message }
            .runWith(Sink.head(), materializer)

        val result = source.await()
        assertEquals("Kotlin", result)
    }



(4) 코루틴 액터 (KHelloActor) 테스트

추가로 만든 **KHelloActor**는 별도 코루틴 기반 액터입니다.

특징:

  • 내부적으로 Channel로 메시지를 수신

  • 코루틴을 통해 메시지를 받아서 응답 채널로 결과 전송

  • Akka(Pekko)가 지원하는 Actor의 최소기능만 제공합니다.
  • JVM에서 지원하는 Thread보다 더 가벼운 경량화 실행모델

    • Java 21의 Virtual Thread 가 그러한 목적이며 스레드를 만들어야할 일이 생길때 대응할수 있습니다. -Java 20이하를 기본채택했을때

테스트 흐름:

  1. KHelloActor를 생성하고 start() (코루틴 Job 시작)

  2. Channel을 통해 KHello 메시지를 보냄

  3. 응답으로 KHelloResponse를 수신

  4. 결과를 검증한 후 리소스 정리 (closecancel)


testKHelloActor

testKHelloActor
    @Test
    fun testKHelloActor() = runBlocking {
        // KHelloActor 생성 및 시작
        val actor = KHelloActor()
        val replyChannel = Channel<KHelloResponse>()

        val job = actor.start() // start 메서드가 Job을 반환하도록 수정

        try {
            // 메시지 전송 및 응답 수신
            withTimeout(3000) { // 타임아웃 설정
                actor.send(KHello("Hello", replyChannel))
                val response = replyChannel.receive()

                // 응답 검증
                assertEquals("Kotlin", response.message)
            }
        } finally {
            // 리소스 정리
            replyChannel.close()
            actor.close()
            job.cancel() // 액터의 코루틴 종료
        }
    }




🧩 3. 관련 클래스/구성요소 요약


클래스설명
HelloActorPekko 기반의 기본 액터 (HelloCommand 수신, Kotlin 응답)
BulkProcessor(참조만) 대량 이벤트를 버퍼링하여 플러시하는 액터​BulkProcessor
HelloRouter라우팅 풀(RoundRobin)로 여러 액터에 분산 처리하는 액터​HelloRouter
KHelloActor코루틴 기반 간단한 액터 (Virtual Thread 스타일 대응 실험)

각 클래스에 대해서 별도 HelloActorTest.kt, BulkProcessorTest.kt, HelloRouterTest.kt 테스트 파일로도 검증을 하고 있었습니다​HelloActorTestBulkProcessorTestHelloRouterTest.


🧩 4. 요약

✅ 이 테스트 파일은
"비동기/동시성" 기술과 "액터모델" 메시지 송수신을 다양한 방법으로 실습하고 검증하는 포괄적 예제입니다.

Kotlin Coroutine, Java CompletableFuture, Reactor Mono/Flux, Akka Stream, Pekko Actor Ask Pattern 전부를 경험할 수 있도록 구성되어 있습니다.

✅ 특히 Kotlin 기반 경량 Actor 모델 (KHelloActor)은 앞으로 Virtual Thread를 고려한 차세대 패턴을 시도하고 있다는 점이 매우 흥미로운 부분입니다.


다음은 자바진영의 비동기 처리가 왜이렇게 많이 있고 복잡하나의 기원이 되는 내용으로 JVM영역의 다양한 IO를 다루기위해서는 결국 다양한 동시성처리 방법을 모두 익혀야하는 불행한 상황이된듯

Erik Meijer의 한마디~

"이보게, 브라이언 괴츠, C#,파이선,자바스크립트는 물론 심지어 PHP도 async, await를 지원하고 있다네. 그런 기능이 없는 언어는 자바일뿐이야.

람다를 이용해서 콜백함수를 사용하면 된다고? 천만에 콜백은 최악이야. 도움이 안된다고. 자바 9 버전에 담으려고 하는 걸 다 내려놓고 지금당장

async, await부터 넣으라고. 그래야 모두가 행복해질수 있어"

여기서 소개하는 전체코드는 다음경로에서 확인할수 있으며 유닛테스트를 통해 실행해볼수 있습니다.

참고자료



  • No labels