Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

공식적으로는 ASK패턴으로 액터에게 메시지를 보내는 패턴은 크게는 두가지가 있습니다. 

  • tell : send이후 응답값이 없어 just tell하는 경우응답값을 대기할 필요 없는 방식  just tell로 forgot and fire
  • ask : send이후 응답값을 대기하는경우필요로하는경우 대기해야하는 케이스


자바가 기본 제공하는 CompletionStage가 맘에 안들기도 해서, ASK 패턴을 3가지 4가지 방식을 이용하는 코드를 작성 코르틴을 위한 유틸 클래스도 생성

Akka HTTP를 사용하면 더 심플하게 이용할수 있지만 코프링기반에 취향에 맞는 방식으로 통일할수 있습니다.

  • CompletionStage AkkaStream- 자바기본
  • Async ( 코르틴 ) - Async가 없는 자바에서 Async/Await를 지원
  • Mono ( WebFlux ) - Webplux 의 비동기 처리
  • AkkaStream - AKKA의 Stream처리로 Java9 Stream API와 유사


Code Block
themeEmacs
@RestController
@RequestMapping("/api/admin/channel")
class AdminChannelController(private val actorSystem: ActorSystem<MainStageActorCommand>,
                             private val supervisorChannelActor: ActorRef<SupervisorChannelCommand>) {

    private val timeout: Duration = Duration.ofSeconds(5)

    @PostMapping@GetMapping("/addlist-counselor-managermanagers")
    fun addCounselorManager(@RequestParam channel: StringlistCounselorManagers(): CompletionStage<String>CompletionStage<List<String>>? {
        return AskPattern.ask(
            supervisorChannelActor,
            { replyTo: ActorRef<SupervisorChannelResponse> -> CreateCounselorManagerGetAllCounselorManagers(channel, replyTo) },
            timeout,
            actorSystem.scheduler()
        ).thenApply { response ->
            when (response) {
                is CounselorManagerCreated -> "Counselor Manager for channel $channel created successfully."
                is SupervisorErrorStringResponseAllCounselorManagers -> response.messagechannels
                else -> "Unknown error occurred."emptyList()
            }
        }
    }

    @GetMapping("/list-counselor-managers-stream")
    fun listCounselorManagerslistCounselorManagersByStream(): CompletionStage<List<String>>? {
        return AskPatternSource.asksingle(Unit)
             supervisorChannelActor,.mapAsync(1) {
            { replyTo: ActorRef<SupervisorChannelResponse> -> GetAllCounselorManagers(replyTo) },
 AskPattern.ask(
           timeout,
            actorSystem.scheduler()supervisorChannelActor,
        ).thenApply { response ->
         { replyTo: ActorRef<SupervisorChannelResponse> when-> GetAllCounselorManagers(responsereplyTo) {},
                 is AllCounselorManagers -> response.channels
 timeout,
                  else -> emptyListactorSystem.scheduler()
            }
    )
    }
    }

    @GetMapping("/list-counselor-managers-stream")}
    fun listCounselorManagersByStream(): CompletionStage<List<String>> {
        return Source.single(Unit.runWith(Sink.head(), actorSystem)
            .mapAsync(1)thenApply { response ->
                AskPattern.ask(when (response) {
                    supervisorChannelActor,
  is AllCounselorManagers -> response.channels
                  { replyTo: ActorRef<SupervisorChannelResponse>else -> GetAllCounselorManagersemptyList(replyTo)
 },
               }
     timeout,
       }
    }

    @GetMapping("/list-counselor-managers-async")
    fun actorSystem.schedulerlistCounselorManagersByCoroutines(): List<String> {
        val response = AkkaUtils.runBlockingAsk(
     )
       supervisorChannelActor,
     }
       { replyTo: ActorRef<SupervisorChannelResponse> ->  .runWith(Sink.head(), actorSystem)GetAllCounselorManagers(replyTo) },
            .thenApply { response ->
timeout, actorSystem
        )

        return when (response) {
                    is AllCounselorManagers -> response.channels
                    else -> emptyList()
                }
            }
    }

    @GetMapping("/list-counselor-managers-asyncmono")
    fun listCounselorManagersByCoroutineslistCounselorManagersByMono(): List<String>Mono<List<String>> {
        val response =return AkkaUtils.runBlockingAskaskActorByMono(
            supervisorChannelActor,
            { replyTo: ActorRef<SupervisorChannelResponse> -> GetAllCounselorManagers(replyTo) },
            timeout,
            actorSystem
        )

.map { response ->
           return when (response) {
                is AllCounselorManagers -> response.channels
                else -> emptyList()
            }
        }
    }
  • 더 줄이고 싶지만 여기까지, 개인적으로 코르틴의 async 방식이 군더더기가 없는듯



Code Block
themeEmacs
package com.example.kotlinbootlabs.module

import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.javadsl.AskPattern
import java.time.Duration
import kotlinx.coroutines.future.await
import kotlinx.coroutines.runBlocking

object AkkaUtils {
    suspend fun <T, R> askActor(
        actor: ActorRef<T>,
        message: (ActorRef<R>) -> T,
        timeout: Duration,
        actorSystem: ActorSystem<*>
    ): R {
        return AskPattern.ask(
            actor,
            message,
            timeout,
            actorSystem.scheduler()
        ).await()
    }

    fun <T, R> runBlockingAsk(
        actor: ActorRef<T>,
        message: (ActorRef<R>) -> T,
        timeout: Duration,
        actorSystem: ActorSystem<*>
    ): R = runBlocking {
        askActor(actor, message, timeout, actorSystem)
    }


    fun <T, R> askActorByMono(
        actor: ActorRef<T>,
        message: (ActorRef<R>) -> T,
        timeout: Duration,
        actorSystem: ActorSystem<*>
    ): Mono<R> {
        return Mono.fromCompletionStage(
            AskPattern.ask(
                actor,
                message,
                timeout,
                actorSystem.scheduler()
            )
        )
    }


}
  • AKKA가 Scala/JAVA만 기본으로 지원하고 Kotlin은 알아서 호환해야하다보니, 자바코드에서 오는 괴리감을 이 유틸 클래스가 처리해주는것으로


액터는 기본으로 기본언어가 가진 동시성 처리 스펙을 활용함으로, 액터이용에 앞서 기본언어가 지원하는 비동기 프로그래밍 방식을 먼저 학습할 필요가 있습니다.