Page History
...
- Open AI가 채택해 유명해진 분산처리 프레임워크인 Ray, Core영역이 왜 액터모델을 채택하고 있는지 주시할 필요가 있습니다.
- 코틀린에서 채택한 코르틴 코어도 액터모델을 기본으로 포함하고 있으며 액터모델을 중심으로 Reactor(리액티브 스트림의 구현체) 로 확장해나가는 기술을 주시할 필요가 있습니다.
구현
- AKKA 의 액터와 구분하기위해 KActor(코액터) 로 네이밍 ( K-컨텐츠아님)
- 송수신 Typed를 사용하고 비헤이버 패턴 채택
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.kactor
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
sealed class HelloKActorCommand
data class Hello(val message: String, val replyTo: kotlinx.coroutines.CompletableDeferred<HelloKActorResponse>) : HelloKActorCommand()
data class GetHelloCount(val replyTo: kotlinx.coroutines.CompletableDeferred<HelloKActorResponse>) : HelloKActorCommand()
sealed class HelloKActorResponse
data class HelloResponse(val message: String) : HelloKActorResponse()
data class HelloCountResponse(val count: Int) : HelloKActorResponse()
class HelloKActor {
private val channel = Channel<HelloKActorCommand>()
private var helloCount = 0
private val scope = CoroutineScope(Dispatchers.Default)
init {
scope.launch {
for (command in channel) {
when (command) {
is Hello -> handleHello(command)
is GetHelloCount -> handleGetHelloCount(command)
}
}
}
}
private fun handleHello(command: Hello) {
if (command.message == "Hello") {
helloCount++
command.replyTo.complete(HelloResponse("Kotlin"))
} else if (command.message == "InvalidMessage") {
throw RuntimeException("Invalid message received!")
}
}
private fun handleGetHelloCount(command: GetHelloCount) {
command.replyTo.complete(HelloCountResponse(helloCount))
}
suspend fun send(command: HelloKActorCommand) {
channel.send(command)
}
fun stop() {
scope.cancel()
}
} |
- AKKA 의 액터와 구분하기위해 KActor(코액터) 로 네이밍 ( K-컨텐츠아님)
- 송수신 Typed를 사용하고 비헤이버 패턴 채택
테스트
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.kactor import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeout import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test class HelloKActorTest { companion object { private const val TIMEOUT_DURATION = 3000L // Timeout duration in milliseconds } private val actor = HelloKActor() @AfterEach fun tearDown() { actor.stop() } @Test fun testHelloCommand() = runBlocking { val response = CompletableDeferred<HelloKActorResponse>() actor.send(Hello("Hello", response)) val result = withTimeout(TIMEOUT_DURATION) { response.await() } as HelloResponse //The response message should be 'Kotlin' assertEquals("Kotlin", result.message, "Kotlin") } @Test fun testGetHelloCountCommand() = runBlocking { val response1 = CompletableDeferred<HelloKActorResponse>() actor.send(Hello("Hello", response1)) withTimeout(TIMEOUT_DURATION) { response1.await() } val response2 = CompletableDeferred<HelloKActorResponse>() actor.send(GetHelloCount(response2)) val result = withTimeout(TIMEOUT_DURATION) { response2.await() } as HelloCountResponse assertEquals(1, result.count, "The hello count should be 1") } @Test fun testInvalidMessage() { val response = CompletableDeferred<HelloKActorResponse>() try { runBlocking { withTimeout(TIMEOUT_DURATION) { actor.send(Hello("InvalidMessage", response)) } } } catch (e: RuntimeException) { assertEquals("Invalid message received!", e.message, "The exception message should be 'Invalid message received!'") } } } |
- Akka test tooli kit 을 이용하는 경우 테스트 중단없는 관찰자(수신검증) 기능을 제공하며, 액터의 수신검증은 기본적으로 위와같은 코드로 가능합니다.
- 코틀린모드에서도 관련 테스트툴 준비중..