Spring Boot에 Pekko(AKKA)를 탑재할시적용된 Reactive Stream 인터페이스를 함께 공유하며 상호 운영할수 있습니다. 웹 동시성 처리를 주로 WebFlux가 작동하는 코르틴에서 수행하며 100% 동시성 프로그래밍으로 시도하게 될텐지만 그래도 스레드모델또는 블락킹을 요구하는 작업이 필요할시 별도의 스레드 Dispacher로 작동되는 AKKASystem에 성능을 분리하는 전략을 선택할수도 있습니다. Webplux가 동시성 프로그래밍을 100%작성이 권장되지만 액터모델은 동시성+병렬성 프로그래밍을 함께 운영할수도 있습니다.
여기서는 코프링에 액터시스템을 자연스럽게 탑재하는방법을 알아보겠습니다.
의존구성
plugins { kotlin("jvm") version "1.9.25" kotlin("kapt") version "2.0.0" kotlin("plugin.spring") version "1.9.25" id("org.springframework.boot") version "3.3.4" id("io.spring.dependency-management") version "1.1.6" } group = "org.example" version = "0.0.1-SNAPSHOT" java { toolchain { languageVersion = JavaLanguageVersion.of(17) } } configurations { compileOnly { extendsFrom(configurations.annotationProcessor.get()) } } repositories { mavenCentral() } val swaggerversion = "2.2.0" val scalaVersion = "2.13" val pekkoVersion = "1.1.2" val pekkoJ2dbc = "1.0.0" val jwtversion = "0.11.5" val jacksonversion = "2.15.2" val commonModelVersion = "0.0.1-SNAPSHOT" dependencies { // External module implementation("org.example.labs:common-model:$commonModelVersion") // jakson,jwt etc implementation("com.fasterxml.jackson.module:jackson-module-kotlin:$jacksonversion") implementation("io.jsonwebtoken:jjwt-api:$jwtversion") implementation("io.jsonwebtoken:jjwt-impl:$jwtversion") implementation("io.jsonwebtoken:jjwt-jackson:$jwtversion") implementation("org.springframework.boot:spring-boot-starter-webflux") // Annotation processor annotationProcessor("org.springframework.boot:spring-boot-configuration-processor") //for kotlin implementation("io.projectreactor.kotlin:reactor-kotlin-extensions") implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") implementation("org.springframework.boot:spring-boot-starter-actuator") // Logging implementation("ch.qos.logback:logback-classic") // Reactive Stream implementation("org.apache.kafka:kafka-streams") // Actor System implementation(platform("org.apache.pekko:pekko-bom_$scalaVersion:$pekkoVersion")) // Classic Actor // 비헤이버 패턴을 이용한 TypedActor를 활용합니다. //implementation("org.apache.pekko:pekko-actor_$scalaVersion:$pekkoVersion") //implementation("org.apache.pekko:pekko-stream_$scalaVersion:$pekkoVersion") // Typed Actor implementation("org.apache.pekko:pekko-actor-typed_$scalaVersion:$pekkoVersion") implementation("org.apache.pekko:pekko-serialization-jackson_$scalaVersion:$pekkoVersion") // Actor Persistence implementation("org.apache.pekko:pekko-persistence-typed_$scalaVersion:$pekkoVersion") implementation("org.apache.pekko:pekko-persistence-query_$scalaVersion:$pekkoVersion") implementation("org.apache.pekko:pekko-persistence-r2dbc_$scalaVersion:$pekkoJ2dbc") // Actor Logging implementation("org.apache.pekko:pekko-slf4j_$scalaVersion:$pekkoVersion") // Actor TestKit testImplementation("org.apache.pekko:pekko-testkit_$scalaVersion:$pekkoVersion") testImplementation("org.apache.pekko:pekko-actor-testkit-typed_$scalaVersion:$pekkoVersion") testImplementation("org.apache.pekko:pekko-stream-testkit_$scalaVersion:$pekkoVersion") testImplementation("org.apache.pekko:pekko-persistence-testkit_$scalaVersion:$pekkoVersion") // Test developmentOnly("org.springframework.boot:spring-boot-devtools") testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("io.projectreactor:reactor-test") testImplementation("org.jetbrains.kotlin:kotlin-test-junit5") testRuntimeOnly("org.junit.platform:junit-platform-launcher") testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.3") testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.9.3") testImplementation("com.squareup.okhttp3:okhttp:4.12.0") // swagger implementation("org.springdoc:springdoc-openapi-starter-webflux-ui:$swaggerversion") } kotlin { compilerOptions { freeCompilerArgs.addAll("-Xjsr305=strict") } } tasks.withType<Test> { useJUnitPlatform() }
AkkaConfig
package org.example.kotlinbootreactivelabs.config import jakarta.annotation.PostConstruct import jakarta.annotation.PreDestroy import org.apache.pekko.actor.typed.ActorSystem import org.example.kotlinbootreactivelabs.actor.MainStageActor import org.example.kotlinbootreactivelabs.actor.MainStageActorCommand import org.example.kotlinbootreactivelabs.actor.hellostate.HelloState import org.example.kotlinbootreactivelabs.actor.hellostate.HelloStateActor import org.example.kotlinbootreactivelabs.actor.hellostate.HelloStateActorCommand import org.slf4j.LoggerFactory import org.springframework.context.annotation.Configuration @Configuration class AkkaConfiguration { private val logger: org.slf4j.Logger = LoggerFactory.getLogger(AkkaConfiguration::class.java) private lateinit var helloState: ActorSystem<HelloStateActorCommand> @PostConstruct fun init(){ logger.info("Starting Akka Actor System") helloState = ActorSystem.create(HelloStateActor.create(HelloState.HAPPY), "HelloStateActor") } @PreDestroy fun shutdown() { mainStage.terminate() } fun getHelloState(): ActorSystem<HelloStateActorCommand> { return helloState } fun getScheduler() = mainStage.scheduler() }
액터객체 자체를 DI로 설정할수도 있지만, 구성할수도 있습니다
액터작성
/** HelloStateActor 처리할 수 있는 명령들 */ sealed class HelloStateActorCommand data class Hello(val message: String, val replyTo: ActorRef<Any>) : HelloStateActorCommand() data class GetHelloCount(val replyTo: ActorRef<Any>) : HelloStateActorCommand() data class GetHelloTotalCount(val replyTo: ActorRef<Any>) : HelloStateActorCommand() data class ChangeState(val newHelloState: HelloState) : HelloStateActorCommand() data class HelloLimit(val message: String, val replyTo: ActorRef<Any>) : HelloStateActorCommand() object ResetHelloCount : HelloStateActorCommand() object StopResetTimer : HelloStateActorCommand() /** HelloStateActor 반환할 수 있는 응답들 */ sealed class HelloStateActorResponse data class HelloResponse(val message: String) : HelloStateActorResponse() data class HelloCountResponse(val count: Int) : HelloStateActorResponse() /** 상태 정의 */ enum class HelloState { HAPPY, ANGRY } /** HelloStateActor 클래스 */ class HelloStateActor private constructor( private val context: ActorContext<HelloStateActorCommand>, private val timers: TimerScheduler<HelloStateActorCommand>, private var helloState: HelloState ) : AbstractBehavior<HelloStateActorCommand>(context) { companion object { fun create(initialHelloState: HelloState): Behavior<HelloStateActorCommand> { return Behaviors.withTimers { timers -> Behaviors.setup { context -> HelloStateActor(context, timers, initialHelloState) } } } } init { timers.startTimerAtFixedRate(ResetHelloCount, Duration.ofSeconds(10)) } override fun createReceive(): Receive<HelloStateActorCommand> { return newReceiveBuilder() .onMessage(Hello::class.java, this::onHello) .onMessage(HelloLimit::class.java, this::onHelloLimit) .onMessage(GetHelloCount::class.java, this::onGetHelloCount) .onMessage(GetHelloTotalCount::class.java, this::onGetHelloTotalCount) .onMessage(ChangeState::class.java, this::onChangeState) .onMessage(ResetHelloCount::class.java, this::onResetHelloCount) .onMessage(StopResetTimer::class.java) { timers.cancel(ResetHelloCount) Behaviors.same() } .build() } private var helloCount: Int = 0 private var helloTotalCount: Int = 0 private val materializer = Materializer.createMaterializer(context.system) private val helloLimitSource = Source.queue<HelloLimit>(100, OverflowStrategy.backpressure()) .throttle(3, Duration.ofSeconds(1)) .to(Sink.foreach { cmd -> context.self.tell(Hello(cmd.message, cmd.replyTo)) }) .run(materializer) private fun onHello(command: Hello): Behavior<HelloStateActorCommand> { when (helloState) { HelloState.HAPPY -> { if (command.message == "Hello") { helloCount++ helloTotalCount++ command.replyTo.tell(HelloResponse("Kotlin")) context.log.info("onHello-Kotlin") } } HelloState.ANGRY -> { command.replyTo.tell(HelloResponse("Don't talk to me!")) } } return this } private fun onHelloLimit(command: HelloLimit): Behavior<HelloStateActorCommand> { helloLimitSource.offer(command).thenAccept { result -> when (result) { is QueueOfferResult.`Enqueued$` -> context.log.info("Command enqueued successfully") is QueueOfferResult.`Dropped$` -> context.log.error("Command dropped") is QueueOfferResult.Failure -> context.log.error("Failed to enqueue command", result.cause()) is QueueOfferResult.`QueueClosed$` -> context.log.error("Queue was closed") } } return this } private fun onGetHelloCount(command: GetHelloCount): Behavior<HelloStateActorCommand> { command.replyTo.tell(HelloCountResponse(helloCount)) context.log.info("onGetHelloCount-helloCount: $helloCount") return this } private fun onGetHelloTotalCount(command: GetHelloTotalCount): Behavior<HelloStateActorCommand> { command.replyTo.tell(HelloCountResponse(helloTotalCount)) return this } private fun onChangeState(command: ChangeState): Behavior<HelloStateActorCommand> { helloState = command.newHelloState return this } private fun onResetHelloCount(command: ResetHelloCount): Behavior<HelloStateActorCommand> { context.log.info("Resetting hello count") helloCount = 0 return this } }
타이머와 이벤트에 따라 상태변화를 가지는 기본액터 샘플입니다.
API와 연결
@RestController @RequestMapping("/api/actor") @Tag(name = "Actor Controller") class ActorController @Autowired constructor(private val akka: AkkaConfiguration) { private val helloState: ActorSystem<HelloStateActorCommand> = akka.getHelloState() @PostMapping("/hello") fun helloCommand(): Mono<String> { // 비동기완료 await 활용 return mono { val response = AskPattern.ask( helloState, { replyTo: ActorRef<Any> -> Hello("Hello", replyTo) }, Duration.ofSeconds(3), akka.getScheduler() ).toCompletableFuture().await() val helloResponse = response as HelloResponse "helloResponse.message: ${helloResponse.message}" } } @GetMapping("/hello-count") fun helloCountCommand(): Mono<String> { // 비동기완료 CompletableFuture 활용 return Mono.fromFuture( AskPattern.ask( helloState, { replyTo: ActorRef<Any> -> GetHelloCount(replyTo) }, Duration.ofSeconds(3), akka.getScheduler() ).toCompletableFuture() ).map { response -> val helloCountResponse = response as HelloCountResponse "helloCountResponse.count: ${helloCountResponse.count}" } } }
비동기 완료는 코틀린이 지원하는 await와 기본자바가 지원하는
CompletableFuture
두가지 방식을 이용할수 있습니다. akka의 비동기완료는 기본언어가 가진 비동기처리를 함께 이용할수 있음음으로 기본기는 중요합니다.ask : Webplux 방식을 이용해 액터에게 메시지를 보낸후 응답을 받는 샘플입니다.
tell : 응답이 필요없는 tell인경우 비동기 이벤트 발생으로 just tell을 하면됩니다.
기타 플러그인
스프링 BOOT의 작동 설정요소를 application.properies 정의하듯이, 액터시스템의 설정요소를 application.conf ( HOCON방식) 로 정의할수 있으며 IDE내에서 설정 편집을 도와주는 플러그인입니다.
Swagger
Reactive Mode의 API로 Actor모델과 상호운영이 가능합니다. 작동 스레드 스케줄러가 코르틴 vs 액터시스템으로 분리된점은 장점이될수도 있고 단점이 될수도 있지만 웹과 분리되어 작동되는 별도 스케줄러가 있는점을 이해하고 활용지점을 찾는것은 중요합니다.
Spring Boot MVC + Weplux 활용할때 API는 Servelet기반 작동하며 Webplux는 별도의 스레드풀로동작하는 다르게 작동되는 두가지 Dispacher를 관리해야하는 케이스와 유사합니다.