Spring Boot에 Pekko(AKKA)를 탑재할시적용된 Reactive Stream 인터페이스를 함께 공유하며 상호 운영할수 있습니다. 웹 동시성 처리를 주로 WebFlux가 작동하는 코르틴에서 수행하며 100% 동시성 프로그래밍으로 시도하게 될텐지만 그래도 스레드모델또는 블락킹을 요구하는 작업이 필요할시 별도의 스레드 Dispacher로 작동되는 AKKASystem에 성능을 분리하는 전략을 선택할수도 있습니다. Webplux가 동시성 프로그래밍을 100%작성이 권장되지만 액터모델은 동시성+병렬성 프로그래밍을 함께 운영할수도 있습니다.


여기서는 코프링에 액터시스템을 자연스럽게 탑재하는방법을 알아보겠습니다.

의존구성

plugins {
    kotlin("jvm") version "1.9.25"
    kotlin("kapt") version "2.0.0"
    kotlin("plugin.spring") version "1.9.25"
    id("org.springframework.boot") version "3.3.4"
    id("io.spring.dependency-management") version "1.1.6"
}
group = "org.example"
version = "0.0.1-SNAPSHOT"
java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}
configurations {
    compileOnly {
        extendsFrom(configurations.annotationProcessor.get())
    }
}
repositories {
    mavenCentral()
}
val swaggerversion = "2.2.0"
val scalaVersion = "2.13"
val pekkoVersion = "1.1.2"
val pekkoJ2dbc = "1.0.0"
val jwtversion = "0.11.5"
val jacksonversion = "2.15.2"
val commonModelVersion = "0.0.1-SNAPSHOT"
dependencies {
    // External module
    implementation("org.example.labs:common-model:$commonModelVersion")
    // jakson,jwt etc
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonversion")
    implementation("io.jsonwebtoken:jjwt-api:$jwtversion")
    implementation("io.jsonwebtoken:jjwt-impl:$jwtversion")
    implementation("io.jsonwebtoken:jjwt-jackson:$jwtversion")
    implementation("org.springframework.boot:spring-boot-starter-webflux")
    // Annotation processor
    annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
    //for kotlin
    implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    // Logging
    implementation("ch.qos.logback:logback-classic")
    // Reactive Stream
    implementation("org.apache.kafka:kafka-streams")
    // Actor System
    implementation(platform("org.apache.pekko:pekko-bom_$scalaVersion:$pekkoVersion"))
    // Classic Actor
    // 비헤이버 패턴을 이용한 TypedActor를 활용합니다.
    //implementation("org.apache.pekko:pekko-actor_$scalaVersion:$pekkoVersion")
    //implementation("org.apache.pekko:pekko-stream_$scalaVersion:$pekkoVersion")
    // Typed Actor
    implementation("org.apache.pekko:pekko-actor-typed_$scalaVersion:$pekkoVersion")
    implementation("org.apache.pekko:pekko-serialization-jackson_$scalaVersion:$pekkoVersion")
    // Actor Persistence
    implementation("org.apache.pekko:pekko-persistence-typed_$scalaVersion:$pekkoVersion")
    implementation("org.apache.pekko:pekko-persistence-query_$scalaVersion:$pekkoVersion")
    implementation("org.apache.pekko:pekko-persistence-r2dbc_$scalaVersion:$pekkoJ2dbc")
    // Actor Logging
    implementation("org.apache.pekko:pekko-slf4j_$scalaVersion:$pekkoVersion")
    // Actor TestKit
    testImplementation("org.apache.pekko:pekko-testkit_$scalaVersion:$pekkoVersion")
    testImplementation("org.apache.pekko:pekko-actor-testkit-typed_$scalaVersion:$pekkoVersion")
    testImplementation("org.apache.pekko:pekko-stream-testkit_$scalaVersion:$pekkoVersion")
    testImplementation("org.apache.pekko:pekko-persistence-testkit_$scalaVersion:$pekkoVersion")
    // Test
    developmentOnly("org.springframework.boot:spring-boot-devtools")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("io.projectreactor:reactor-test")
    testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
    testRuntimeOnly("org.junit.platform:junit-platform-launcher")
    testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.3")
    testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.9.3")
    testImplementation("com.squareup.okhttp3:okhttp:4.12.0")
    // swagger
    implementation("org.springdoc:springdoc-openapi-starter-webflux-ui:$swaggerversion")
}
kotlin {
    compilerOptions {
        freeCompilerArgs.addAll("-Xjsr305=strict")
    }
}
tasks.withType<Test> {
    useJUnitPlatform()
}



AkkaConfig

package org.example.kotlinbootreactivelabs.config
import jakarta.annotation.PostConstruct
import jakarta.annotation.PreDestroy
import org.apache.pekko.actor.typed.ActorSystem
import org.example.kotlinbootreactivelabs.actor.MainStageActor
import org.example.kotlinbootreactivelabs.actor.MainStageActorCommand
import org.example.kotlinbootreactivelabs.actor.hellostate.HelloState
import org.example.kotlinbootreactivelabs.actor.hellostate.HelloStateActor
import org.example.kotlinbootreactivelabs.actor.hellostate.HelloStateActorCommand
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Configuration
@Configuration
class AkkaConfiguration {
    private val logger: org.slf4j.Logger = LoggerFactory.getLogger(AkkaConfiguration::class.java)
    private lateinit var helloState: ActorSystem<HelloStateActorCommand>
    @PostConstruct
    fun init(){
        logger.info("Starting Akka Actor System")
        helloState = ActorSystem.create(HelloStateActor.create(HelloState.HAPPY), "HelloStateActor")
    }
    @PreDestroy
    fun shutdown() {
        mainStage.terminate()
    }
    fun getHelloState(): ActorSystem<HelloStateActorCommand> {
        return helloState
    }
    fun getScheduler() = mainStage.scheduler()
}
  • 액터객체 자체를 DI로 설정할수도 있지만, 구성할수도 있습니다



액터작성

/** HelloStateActor 처리할 수 있는 명령들 */
sealed class HelloStateActorCommand
data class Hello(val message: String, val replyTo: ActorRef<Any>) : HelloStateActorCommand()
data class GetHelloCount(val replyTo: ActorRef<Any>) : HelloStateActorCommand()
data class GetHelloTotalCount(val replyTo: ActorRef<Any>) : HelloStateActorCommand()
data class ChangeState(val newHelloState: HelloState) : HelloStateActorCommand()
data class HelloLimit(val message: String, val replyTo: ActorRef<Any>) : HelloStateActorCommand()
object ResetHelloCount : HelloStateActorCommand()
object StopResetTimer : HelloStateActorCommand()
/** HelloStateActor 반환할 수 있는 응답들 */
sealed class HelloStateActorResponse
data class HelloResponse(val message: String) : HelloStateActorResponse()
data class HelloCountResponse(val count: Int) : HelloStateActorResponse()
/** 상태 정의 */
enum class HelloState {
    HAPPY, ANGRY
}
/** HelloStateActor 클래스 */
class HelloStateActor private constructor(
    private val context: ActorContext<HelloStateActorCommand>,
    private val timers: TimerScheduler<HelloStateActorCommand>,
    private var helloState: HelloState
) : AbstractBehavior<HelloStateActorCommand>(context) {
    companion object {
        fun create(initialHelloState: HelloState): Behavior<HelloStateActorCommand> {
            return Behaviors.withTimers { timers ->
                Behaviors.setup { context -> HelloStateActor(context, timers, initialHelloState) }
            }
        }
    }
    init {
        timers.startTimerAtFixedRate(ResetHelloCount, Duration.ofSeconds(10))
    }
    override fun createReceive(): Receive<HelloStateActorCommand> {
        return newReceiveBuilder()
            .onMessage(Hello::class.java, this::onHello)
            .onMessage(HelloLimit::class.java, this::onHelloLimit)
            .onMessage(GetHelloCount::class.java, this::onGetHelloCount)
            .onMessage(GetHelloTotalCount::class.java, this::onGetHelloTotalCount)
            .onMessage(ChangeState::class.java, this::onChangeState)
            .onMessage(ResetHelloCount::class.java, this::onResetHelloCount)
            .onMessage(StopResetTimer::class.java) {
                timers.cancel(ResetHelloCount)
                Behaviors.same()
            }
            .build()
    }
    private var helloCount: Int = 0
    private var helloTotalCount: Int = 0
    private val materializer = Materializer.createMaterializer(context.system)
    private val helloLimitSource = Source.queue<HelloLimit>(100, OverflowStrategy.backpressure())
        .throttle(3, Duration.ofSeconds(1))
        .to(Sink.foreach { cmd ->
            context.self.tell(Hello(cmd.message, cmd.replyTo))
        })
        .run(materializer)
    private fun onHello(command: Hello): Behavior<HelloStateActorCommand> {
        when (helloState) {
            HelloState.HAPPY -> {
                if (command.message == "Hello") {
                    helloCount++
                    helloTotalCount++
                    command.replyTo.tell(HelloResponse("Kotlin"))
                    context.log.info("onHello-Kotlin")
                }
            }
            HelloState.ANGRY -> {
                command.replyTo.tell(HelloResponse("Don't talk to me!"))
            }
        }
        return this
    }
    private fun onHelloLimit(command: HelloLimit): Behavior<HelloStateActorCommand> {
        helloLimitSource.offer(command).thenAccept { result ->
            when (result) {
                is QueueOfferResult.`Enqueued$` -> context.log.info("Command enqueued successfully")
                is QueueOfferResult.`Dropped$` -> context.log.error("Command dropped")
                is QueueOfferResult.Failure -> context.log.error("Failed to enqueue command", result.cause())
                is QueueOfferResult.`QueueClosed$` -> context.log.error("Queue was closed")
            }
        }
        return this
    }
    private fun onGetHelloCount(command: GetHelloCount): Behavior<HelloStateActorCommand> {
        command.replyTo.tell(HelloCountResponse(helloCount))
        context.log.info("onGetHelloCount-helloCount: $helloCount")
        return this
    }
    private fun onGetHelloTotalCount(command: GetHelloTotalCount): Behavior<HelloStateActorCommand> {
        command.replyTo.tell(HelloCountResponse(helloTotalCount))
        return this
    }
    private fun onChangeState(command: ChangeState): Behavior<HelloStateActorCommand> {
        helloState = command.newHelloState
        return this
    }
    private fun onResetHelloCount(command: ResetHelloCount): Behavior<HelloStateActorCommand> {
        context.log.info("Resetting hello count")
        helloCount = 0
        return this
    }
}
  • 타이머와 이벤트에 따라 상태변화를 가지는 기본액터 샘플입니다.


API와 연결


@RestController
@RequestMapping("/api/actor")
@Tag(name = "Actor Controller")
class ActorController @Autowired constructor(private val akka: AkkaConfiguration) {
    private val helloState: ActorSystem<HelloStateActorCommand> = akka.getHelloState()
    @PostMapping("/hello")
    fun helloCommand(): Mono<String> {
        // 비동기완료 await 활용
        return mono {
            val response = AskPattern.ask(
                helloState,
                { replyTo: ActorRef<Any> -> Hello("Hello", replyTo) },
                Duration.ofSeconds(3),
                akka.getScheduler()
            ).toCompletableFuture().await()
            val helloResponse = response as HelloResponse
            "helloResponse.message: ${helloResponse.message}"
        }
    }
    @GetMapping("/hello-count")
    fun helloCountCommand(): Mono<String> {
        // 비동기완료 CompletableFuture 활용
        return Mono.fromFuture(
            AskPattern.ask(
                helloState,
                { replyTo: ActorRef<Any> -> GetHelloCount(replyTo) },
                Duration.ofSeconds(3),
                akka.getScheduler()
            ).toCompletableFuture()
        ).map { response ->
            val helloCountResponse = response as HelloCountResponse
            "helloCountResponse.count: ${helloCountResponse.count}"
        }
    }
}


  • 비동기 완료는 코틀린이 지원하는 await와 기본자바가 지원하는 CompletableFuture 두가지 방식을 이용할수 있습니다. akka의 비동기완료는 기본언어가 가진 비동기처리를 함께 이용할수 있음음으로 기본기는 중요합니다.

  • ask : Webplux 방식을 이용해 액터에게 메시지를 보낸후 응답을 받는 샘플입니다.

  • tell : 응답이 필요없는 tell인경우 비동기 이벤트 발생으로 just tell을 하면됩니다.


기타 플러그인

스프링 BOOT의 작동 설정요소를 application.properies 정의하듯이, 액터시스템의 설정요소를 application.conf ( HOCON방식) 로 정의할수 있으며 IDE내에서 설정 편집을 도와주는 플러그인입니다.



Swagger

  • Reactive Mode의 API로 Actor모델과 상호운영이 가능합니다. 작동 스레드 스케줄러가 코르틴 vs 액터시스템으로 분리된점은 장점이될수도 있고 단점이 될수도 있지만 웹과 분리되어 작동되는 별도 스케줄러가 있는점을 이해하고 활용지점을 찾는것은 중요합니다.

    • Spring Boot MVC + Weplux 활용할때 API는 Servelet기반 작동하며 Webplux는 별도의 스레드풀로동작하는 다르게 작동되는 두가지 Dispacher를 관리해야하는 케이스와 유사합니다.






  • No labels