동시성처리와 병렬성처리는 유사해보이지만 큰 차이가있으며 간략하게 간략하게 설명될수 있습니다.



🧩 1. 전체 테스트 구조

이 파일은 코루틴, 리액터(Reactor), CompletableFuture, Java Stream, Akka Streams, 액터(Actor) 등을 사용해서
"비동기 동시성" 그리고 "액터(Actor) 기반 메시지 통신" 테스트를 다양한 방법으로 검증하는 것입니다.

구체적으로는:


구분내용
기초 비동기 처리Coroutine, Mono, CompletableFuture, Java Stream으로 "Hello World"를 만드는 간단한 동시성 테스트
병렬 실행 테스트3개의 작업(문자열 생성+지연 메소드 호출 2개)을 병렬로 수행하여 하나의 문자열로 합치는 테스트
Akka Stream 테스트Akka Streams를 사용하여 비동기 스트림 병합 테스트
Actor Ask 패턴 테스트HelloActor를 대상으로 ask (질문→응답받기) 패턴 테스트
Kotlin 기반 커스텀 KHelloActor 테스트별도로 작성한 코루틴 기반 액터 (KHelloActor) 테스트

🧩 2. 주요 테스트 코드 설명

(1) 기본 비동기 처리 (Hello → Hello World)


방법요약
Coroutineasync로 비동기 작업하고 await()로 결과 가져옴
Mono리액터 기반 Mono 스트림 처리
CompletableFuture자바 기본 비동기 API 활용
Java Stream스트림 map 연산으로 변환 처리 후 CompletableFuture

공통 목표: "Hello World" 문자열 생성 및 검증


    @Test
    fun testHelloByCoroutine() = runBlocking {
        // Test using CompletableFuture to concatenate strings asynchronously
        val input = "Hello"
        val result = withContext(Dispatchers.Default) {
            val part1 = async { input + " World" }
            part1.await()
        }
        assertEquals("Hello World", result)
    }
    @Test
    fun testHelloByMono() {
        // Test using Reactor Mono to concatenate strings asynchronously
        val input = "Hello"
        val monoResult = Mono.just(input)
            .map { it + " World" }
            .block()

        assertEquals("Hello World", monoResult)
    }
    @Test
    fun testHelloByCompletableFuture() {
        // Test using CompletableFuture to concatenate strings asynchronously
        val future = CompletableFuture.supplyAsync { "Hello" }
            .thenApplyAsync { result -> result + " World" }

        val result = future.get()

        assertEquals("Hello World", result)
    }
   @Test
    fun testHelloByJavaStream() {
        // Test using CompletableFuture with Java Streams to concatenate strings asynchronously
        val future = CompletableFuture.supplyAsync {
            listOf("Hello")
                .stream()
                .map { it + " World" }
                .findFirst()
                .orElse("")
        }
        val result = future.get()
        assertEquals("Hello World", result)
    }



(2) 동시성 처리 (Concurrency)

getDelayString()getDelayString2()는 각각 600ms, 500ms 지연 후 "SlowWorld1", "SlowWorld2"를 반환합니다.

이걸 병렬로 돌려서:

Hello World SlowWorld1 SlowWorld2

라는 최종 문자열을 만들고 검증합니다.

방법별 특징:


    @Test
    fun testConcurrentByCoroutine() = runBlocking {
        val input = "Hello"
        val result = withContext(Dispatchers.Default) {
            val part1 = async { input + " World" }
            val part2 = async { getDelayString() }
            val part3 = async { getDelayString2() }
            "${part1.await()} ${part2.await()} ${part3.await()}"
        }

        assertEquals("Hello World SlowWorld1 SlowWorld2", result)
    }
    @Test
    fun testConcurrentByMono() {
        val input = "Hello"
        val monoResult = Mono.just(input)
            .map { it + " World" }
            .zipWith(Mono.fromCallable { getDelayString() })
            .zipWith(Mono.fromCallable { getDelayString2() }) { part1, part2 ->
                "${part1.t1} ${part1.t2} $part2"
            }
            .block()

        assertEquals("Hello World SlowWorld1 SlowWorld2", monoResult)
    }
    @Test
    fun testConcurrentByFlux() {
        val input = listOf("Hello", "World")
        val fluxResult = Flux.fromIterable(input)
            .flatMap { data ->
                when (data) {
                    "Hello" -> Mono.fromCallable { getDelayString() }
                    "World" -> Mono.fromCallable { getDelayString2() }
                    else -> Mono.empty()
                }
            }
            .collectList()
            .map { results -> results.joinToString(" ") }
            .block()

        assertEquals("SlowWorld1 SlowWorld2", fluxResult)
    }
    @Test
    fun testConcurrentByCompletableFuture() {
        val future = CompletableFuture.supplyAsync { "Hello" }
            .thenApplyAsync { result -> result + " World" }
            .thenCombineAsync(CompletableFuture.supplyAsync { getDelayString() }) { part1, part2 ->
                "$part1 $part2"
            }
            .thenCombineAsync(CompletableFuture.supplyAsync { getDelayString2() }) { part1, part3 ->
                "$part1 $part3"
            }

        val result = future.get()

        assertEquals("Hello World SlowWorld1 SlowWorld2", result)
    }
    @Test
    fun testConcurrentByJavsStream() {
        val result = listOf("Hello")
            .stream()
            .map { input -> input + " World" }
            .map { part1 ->
                val part2 = CompletableFuture.supplyAsync { getDelayString() }.join()
                val part3 = CompletableFuture.supplyAsync { getDelayString2() }.join()
                "$part1 $part2 $part3"
            }
            .findFirst()
            .orElse("")

        assertEquals("Hello World SlowWorld1 SlowWorld2", result)
    }
    @Test
    fun testConcurrentByAkkaStream() {
        val materializer = Materializer.createMaterializer(actorSystem.system())

        val source = Source.single("Hello")
            .map { it + " World" }
            .zipWith(Source.single(getDelayString())) { part1, part2 -> Pair(part1, part2) }
            .zipWith(Source.single(getDelayString2())) { pair, part3 ->
                "${pair.first} ${pair.second} $part3"
            }
            .runWith(Sink.head(), materializer)

        val result = source.toCompletableFuture().get()

        assertEquals("Hello World SlowWorld1 SlowWorld2", result)
    }




(3) 액터(Actor) 기반 메시지 송수신 테스트

HelloActor는 다음 특징을 갖습니다​HelloActor:

여기서 검증하는 것:

Pekko의 AskPattern.ask 메소드를 활용하고, Duration 타임아웃 설정도 적용함


    @Test
    fun testActorAskByCoroutine() = runBlocking {
        // Test using Actor Ask pattern with coroutines
        val actorRef = actorSystem.spawn(HelloActor.create())
        var response = AskPattern.ask(
            actorRef,
            { replyTo: ActorRef<HelloCommand> -> Hello("Hello", replyTo) },
            Duration.ofSeconds(3),
            actorSystem.scheduler()
        ).toCompletableFuture().await()

        assertEquals(HelloResponse("Kotlin"), response)
    }
    @Test
    fun testActorAskByWebflux() {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val result = Mono.just("Hello")
            .flatMap { input ->
                Mono.fromFuture(
                    AskPattern.ask(
                        actorRef,
                        { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                        Duration.ofSeconds(3),
                        actorSystem.scheduler()
                    ).toCompletableFuture()
                )
            }
            .map { response -> (response as HelloResponse).message }
            .block()

        assertEquals("Kotlin", result)
    }
    @Test
    fun testActorAskByCompletableFuture() {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val future = CompletableFuture.supplyAsync { "Hello" }
            .thenCompose { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture()
            }
            .thenApply { response -> (response as HelloResponse).message }

        val result = future.get()
        assertEquals("Kotlin", result)
    }
    @Test
    fun testActorAskByJavaStream() {
        val actorRef = actorSystem.spawn(HelloActor.create())

        val result = listOf("Hello")
            .stream()
            .map { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture().join()
            }
            .map { response -> (response as HelloResponse).message }
            .findFirst()
            .orElse("")

        assertEquals("Kotlin", result)
    }
    @Test
    fun testActorAskByAkkaStream() {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val materializer = Materializer.createMaterializer(actorSystem.system())

        val source = Source.single("Hello")
            .mapAsync(1) { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture()
            }
            .map { response -> (response as HelloResponse).message }
            .runWith(Sink.head(), materializer)

        val result = source.toCompletableFuture().get()
        assertEquals("Kotlin", result)
    }
    @Test
    fun testActorAskByAkkaStreamWithCoroutine() = runBlocking {
        val actorRef = actorSystem.spawn(HelloActor.create())
        val materializer = Materializer.createMaterializer(actorSystem.system())

        val source = Source.single("Hello")
            .mapAsync(1) { input ->
                AskPattern.ask(
                    actorRef,
                    { replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
                    Duration.ofSeconds(3),
                    actorSystem.scheduler()
                ).toCompletableFuture()
            }
            .map { response -> (response as HelloResponse).message }
            .runWith(Sink.head(), materializer)

        val result = source.await()
        assertEquals("Kotlin", result)
    }



(4) 코루틴 액터 (KHelloActor) 테스트

추가로 만든 **KHelloActor**는 별도 코루틴 기반 액터입니다.

특징:

테스트 흐름:

  1. KHelloActor를 생성하고 start() (코루틴 Job 시작)

  2. Channel을 통해 KHello 메시지를 보냄

  3. 응답으로 KHelloResponse를 수신

  4. 결과를 검증한 후 리소스 정리 (closecancel)


    @Test
    fun testKHelloActor() = runBlocking {
        // KHelloActor 생성 및 시작
        val actor = KHelloActor()
        val replyChannel = Channel<KHelloResponse>()

        val job = actor.start() // start 메서드가 Job을 반환하도록 수정

        try {
            // 메시지 전송 및 응답 수신
            withTimeout(3000) { // 타임아웃 설정
                actor.send(KHello("Hello", replyChannel))
                val response = replyChannel.receive()

                // 응답 검증
                assertEquals("Kotlin", response.message)
            }
        } finally {
            // 리소스 정리
            replyChannel.close()
            actor.close()
            job.cancel() // 액터의 코루틴 종료
        }
    }




🧩 3. 관련 클래스/구성요소 요약


클래스설명
HelloActorPekko 기반의 기본 액터 (HelloCommand 수신, Kotlin 응답)
BulkProcessor(참조만) 대량 이벤트를 버퍼링하여 플러시하는 액터​BulkProcessor
HelloRouter라우팅 풀(RoundRobin)로 여러 액터에 분산 처리하는 액터​HelloRouter
KHelloActor코루틴 기반 간단한 액터 (Virtual Thread 스타일 대응 실험)

각 클래스에 대해서 별도 HelloActorTest.kt, BulkProcessorTest.kt, HelloRouterTest.kt 테스트 파일로도 검증을 하고 있었습니다​HelloActorTestBulkProcessorTestHelloRouterTest.


🧩 4. 요약

✅ 이 테스트 파일은
"비동기/동시성" 기술과 "액터모델" 메시지 송수신을 다양한 방법으로 실습하고 검증하는 포괄적 예제입니다.

Kotlin Coroutine, Java CompletableFuture, Reactor Mono/Flux, Akka Stream, Pekko Actor Ask Pattern 전부를 경험할 수 있도록 구성되어 있습니다.

✅ 특히 Kotlin 기반 경량 Actor 모델 (KHelloActor)은 앞으로 Virtual Thread를 고려한 차세대 패턴을 시도하고 있다는 점이 매우 흥미로운 부분입니다.


전체코드