Page History
링크 시뮬레이션 : https://famous-caramel-e50cb4.netlify.app/
- 요즘 유행하는 코드생성기 Bolt로 이 개념을 설명하기위해 작성해보았습니다.
- 한명이 모든일을 하는 바리스타와 주문하는사람은 따로있고 바리스타가 분리되어 운영하는 커피숍
동시성처리와 병렬성처리는 유사해보이지만 큰 차이가있으며 다음과같이 간략하게 설명될수 있습니다.
- 동시성 동시처리 : 한사람이 커피주문 받고 만들기를 동시에한다. 커피머신을 이용하면서도 주문을 받음 - 비동기 논블록킹 프로그래밍
- 주의 : 블락킹이 하나라도 발생하면 전체가 함께 지연됨
- 병렬성 병렬처리 : 커피주문을 받는사람과 만드는 사람이 각각 따로 있다. - 병렬처리 멀티스레드 프로그래밍
- 주의 : 블락킹이 발생하면 특정 컨텍스트 담당영역만 지연되지만 작업자 하나에 블락킹이 발생해도 다른작업가 작업을 진행하는것에 영향을 주지않지만 작업처리간 자원 공유문제를 잘 해결하지 못하면 데드락및 궁핍현상이 궁핍현상등이 발생해 작업이 전체작업이 중단될수도 있음있습니다.
- 순차성순차처리(동기순차 블락킹 프로그래밍) : 한사람이 커피주문을 받고 커피를 만들지만~ 커피가 완성될때까지 다음손님 주문을 받지 못한다.
- 동시/병렬로 진행되지 않음으로 전체 소요시간은 각 Task의 전체합으로 전체완료시간이 가장느림가장느리며
- 병렬프로세싱은 가장빠른 전체완료시간을 보장할수 있지만, 동시성처리는 더 적은 자원으로 병렬프로세싱과 유사한 효율성을 가질수 있습니다.
JVM(자바+코틀린) 영역이 지원하는 동시성 프로그래밍 방식을 먼저 알아보고 액터모델에 이벤트를 전송해 결과값을 받는 ASK패턴도 알아보겠습니다.
🧩 1. 전체 테스트 구조
이 파일은 코루틴, 리액터(Reactor), CompletableFuture, Java Stream, Akka Streams, 액터(Actor) 등을 사용해서
"비동기 동시성" 그리고 "액터(Actor) 기반 메시지 통신" 테스트를 다양한 방법으로 검증하는 것입니다.
...
| 구분 | 내용 |
|---|---|
| 기초 비동기 처리 | Coroutine, Mono, CompletableFuture, Java Stream으로 "Hello World"를 만드는 간단한 동시성 테스트 |
| 병렬 실행 테스트 | 3개의 작업(문자열 생성+지연 메소드 호출 2개)을 병렬로 수행하여 하나의 문자열로 합치는 테스트 |
| Akka Stream 테스트 | Akka Streams를 사용하여 비동기 스트림 병합 테스트 |
| Actor Ask 패턴 테스트 | HelloActor를 대상으로 ask (질문→응답받기) 패턴 테스트 |
| Kotlin 기반 커스텀 KHelloActor 테스트 | 별도로 작성한 코루틴 기반 액터 (KHelloActor) 테스트 |
...
🧩 2. 주요 테스트 코드 설명
(1) 기본 비동기 처리 (Hello → Hello World)
| 방법 | 요약 |
|---|---|
Coroutine | async로 비동기 작업하고 await()로 결과 가져옴 |
Mono | 리액터 기반 Mono 스트림 처리 |
CompletableFuture | 자바 기본 비동기 API 활용 |
Java Stream | 스트림 map 연산으로 변환 처리 후 CompletableFuture |
...
| Code Block | ||||
|---|---|---|---|---|
| ||||
@Test
fun testHelloByJavaStream() {
// Test using CompletableFuture with Java Streams to concatenate strings asynchronously
val future = CompletableFuture.supplyAsync {
listOf("Hello")
.stream()
.map { it + " World" }
.findFirst()
.orElse("")
}
val result = future.get()
assertEquals("Hello World", result)
} |
...
(2) 동시성 처리 (Concurrency)
getDelayString()와 getDelayString2()는 각각 600ms, 500ms 지연 후 "SlowWorld1", "SlowWorld2"를 반환합니다.
이걸 동시처리 병렬로 돌려서:
Hello World SlowWorld1 SlowWorld2
...
| Code Block | ||||
|---|---|---|---|---|
| ||||
@Test
fun testConcurrentByAkkaStream() {
val materializer = Materializer.createMaterializer(actorSystem.system())
val source = Source.single("Hello")
.map { it + " World" }
.zipWith(Source.single(getDelayString())) { part1, part2 -> Pair(part1, part2) }
.zipWith(Source.single(getDelayString2())) { pair, part3 ->
"${pair.first} ${pair.second} $part3"
}
.runWith(Sink.head(), materializer)
val result = source.toCompletableFuture().get()
assertEquals("Hello World SlowWorld1 SlowWorld2", result)
} |
...
(3) 액터(Actor) 기반 메시지 송수신 테스트
HelloActor는 다음 특징을 갖습니다HelloActor:
...
Pekko의
AskPattern.ask메소드를 활용하고,Duration타임아웃 설정도 적용함
testActorAskByCoroutine
| Code Block | ||||
|---|---|---|---|---|
| ||||
@Test
fun testActorAskByCoroutine() = runBlocking {
// Test using Actor Ask pattern with coroutines
val actorRef = actorSystem.spawn(HelloActor.create())
var response = AskPattern.ask(
actorRef,
{ replyTo: ActorRef<HelloCommand> -> Hello("Hello", replyTo) },
Duration.ofSeconds(3),
actorSystem.scheduler()
).toCompletableFuture().await()
assertEquals(HelloResponse("Kotlin"), response)
} |
testActorAskByWebflux
| Code Block | ||||
|---|---|---|---|---|
| ||||
@Test
fun testActorAskByWebflux() {
val actorRef = actorSystem.spawn(HelloActor.create())
val result = Mono.just("Hello")
.flatMap { input ->
Mono.fromFuture(
AskPattern.ask(
actorRef,
{ replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
Duration.ofSeconds(3),
actorSystem.scheduler()
).toCompletableFuture()
)
}
.map { response -> (response as HelloResponse).message }
.block()
assertEquals("Kotlin", result)
} |
testActorAskByCompletableFuture
| Code Block | ||||
|---|---|---|---|---|
| ||||
@Test
fun testActorAskByCompletableFuture() {
val actorRef = actorSystem.spawn(HelloActor.create())
val future = CompletableFuture.supplyAsync { "Hello" }
.thenCompose { input ->
AskPattern.ask(
actorRef,
{ replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
Duration.ofSeconds(3),
actorSystem.scheduler()
).toCompletableFuture()
}
.thenApply { response -> (response as HelloResponse).message }
val result = future.get()
assertEquals("Kotlin", result)
} |
testActorAskByJavaStream
| Code Block | ||||
|---|---|---|---|---|
| ||||
@Test
fun testActorAskByJavaStream() {
val actorRef = actorSystem.spawn(HelloActor.create())
val result = listOf("Hello")
.stream()
.map { input ->
AskPattern.ask(
actorRef,
{ replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
Duration.ofSeconds(3),
actorSystem.scheduler()
).toCompletableFuture().join()
}
.map { response -> (response as HelloResponse).message }
.findFirst()
.orElse("")
assertEquals("Kotlin", result)
} |
testActorAskByAkkaStream
| Code Block | ||||
|---|---|---|---|---|
| ||||
@Test
fun testActorAskByAkkaStream() {
val actorRef = actorSystem.spawn(HelloActor.create())
val materializer = Materializer.createMaterializer(actorSystem.system())
val source = Source.single("Hello")
.mapAsync(1) { input ->
AskPattern.ask(
actorRef,
{ replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
Duration.ofSeconds(3),
actorSystem.scheduler()
).toCompletableFuture()
}
.map { response -> (response as HelloResponse).message }
.runWith(Sink.head(), materializer)
val result = source.toCompletableFuture().get()
assertEquals("Kotlin", result)
} |
testActorAskByAkkaStreamWithCoroutine
| Code Block | ||||
|---|---|---|---|---|
| ||||
@Test
fun testActorAskByAkkaStreamWithCoroutine() = runBlocking {
val actorRef = actorSystem.spawn(HelloActor.create())
val materializer = Materializer.createMaterializer(actorSystem.system())
val source = Source.single("Hello")
.mapAsync(1) { input ->
AskPattern.ask(
actorRef,
{ replyTo: ActorRef<HelloCommand> -> Hello(input, replyTo) },
Duration.ofSeconds(3),
actorSystem.scheduler()
).toCompletableFuture()
}
.map { response -> (response as HelloResponse).message }
.runWith(Sink.head(), materializer)
val result = source.await()
assertEquals("Kotlin", result)
} |
...
(4) 코루틴 액터 (KHelloActor) 테스트
추가로 만든 **KHelloActor**는 별도 코루틴 기반 액터입니다.
...
KHelloActor를 생성하고start()(코루틴 Job 시작)Channel을 통해KHello메시지를 보냄응답으로
KHelloResponse를 수신결과를 검증한 후 리소스 정리 (
close및cancel)
testKHelloActor
| Code Block | ||||
|---|---|---|---|---|
| ||||
@Test
fun testKHelloActor() = runBlocking {
// KHelloActor 생성 및 시작
val actor = KHelloActor()
val replyChannel = Channel<KHelloResponse>()
val job = actor.start() // start 메서드가 Job을 반환하도록 수정
try {
// 메시지 전송 및 응답 수신
withTimeout(3000) { // 타임아웃 설정
actor.send(KHello("Hello", replyChannel))
val response = replyChannel.receive()
// 응답 검증
assertEquals("Kotlin", response.message)
}
} finally {
// 리소스 정리
replyChannel.close()
actor.close()
job.cancel() // 액터의 코루틴 종료
}
} |
...
🧩 3. 관련 클래스/구성요소 요약
| 클래스 | 설명 |
|---|---|
HelloActor | Pekko 기반의 기본 액터 (HelloCommand 수신, Kotlin 응답) |
BulkProcessor | (참조만) 대량 이벤트를 버퍼링하여 플러시하는 액터BulkProcessor |
HelloRouter | 라우팅 풀(RoundRobin)로 여러 액터에 분산 처리하는 액터HelloRouter |
KHelloActor | 코루틴 기반 간단한 액터 (Virtual Thread 스타일 대응 실험) |
각 클래스에 대해서 별도 HelloActorTest.kt, BulkProcessorTest.kt, HelloRouterTest.kt 테스트 파일로도 검증을 하고 있었습니다HelloActorTestBulkProcessorTestHelloRouterTest.
...
🧩 4. 요약
✅ 이 테스트 파일은
"비동기/동시성" 기술과 "액터모델" 메시지 송수신을 다양한 방법으로 실습하고 검증하는 포괄적 예제입니다.
...
여기서 소개하는 전체코드는 다음경로에서 확인할수 있으며 유닛테스트를 통해 실행해볼수 있습니다.
전체코드
참고자료
- 작성코드 - 코틀린버전
- Terminology -여기서는 동시성처리만 설명되었으며 병렬처리 프로그래밍과 함께 설명합니다. https://github.com/psmon/kopring-reactive-labs/blob/main/KotlinBootReactiveLabs/src/test/kotlin/org/example/kotlinbootreactivelabs/actor/guide/ConcurrentTest.kt


