Page History
...
공식적으로는 ASK패턴으로 액터에게 메시지를 보내는 패턴은 크게는 두가지가 있습니다.
- tell : send이후 응답값이 없어 just tell하는 경우응답값을 대기할 필요 없는 방식 just tell로 forgot and fire
- ask : send이후 응답값을 대기하는경우필요로하는경우 대기해야하는 케이스
자바가 기본 제공하는 CompletionStage가 맘에 안들기도 해서, ASK 패턴을 3가지 4가지 방식을 이용하는 코드를 작성 코르틴을 위한 유틸 클래스도 생성
Akka HTTP를 사용하면 더 심플하게 이용할수 있지만 코프링기반에 취향에 맞는 방식으로 통일할수 있습니다.
- CompletionStage AkkaStream- 자바기본
- Async ( 코르틴 ) - Async가 없는 자바에서 Async/Await를 지원
- Mono ( WebFlux ) - Webplux 의 비동기 처리
- AkkaStream - AKKA의 Stream처리로 Java9 Stream API와 유사
Code Block | ||
---|---|---|
| ||
@RestController @RequestMapping("/api/admin/channel") class AdminChannelController(private val actorSystem: ActorSystem<MainStageActorCommand>, private val supervisorChannelActor: ActorRef<SupervisorChannelCommand>) { private val timeout: Duration = Duration.ofSeconds(5) @PostMapping@GetMapping("/addlist-counselor-managermanagers") fun addCounselorManager(@RequestParam channel: StringlistCounselorManagers(): CompletionStage<String>CompletionStage<List<String>>? { return AskPattern.ask( supervisorChannelActor, { replyTo: ActorRef<SupervisorChannelResponse> -> CreateCounselorManagerGetAllCounselorManagers(channel, replyTo) }, timeout, actorSystem.scheduler() ).thenApply { response -> when (response) { is CounselorManagerCreatedAllCounselorManagers -> "Counselor Manager for channel $channel created successfully." is SupervisorErrorStringResponse -> response.messageresponse.channels else -> "Unknown error occurred."emptyList() } } } @GetMapping("/list-counselor-managers-stream") fun listCounselorManagerslistCounselorManagersByStream(): CompletionStage<List<String>>? { return AskPatternSource.asksingle(Unit) supervisorChannelActor,.mapAsync(1) { { replyTo: ActorRef<SupervisorChannelResponse> -> GetAllCounselorManagers(replyTo) },AskPattern.ask( timeout, actorSystem.scheduler()supervisorChannelActor, ).thenApply { response -> { replyTo: ActorRef<SupervisorChannelResponse> when-> GetAllCounselorManagers(responsereplyTo) {}, is AllCounselorManagers -> response.channels timeout, else -> emptyListactorSystem.scheduler() } ) } } @GetMapping("/list-counselor-managers-stream")} fun listCounselorManagersByStream(): CompletionStage<List<String>> { return Source.single(Unit.runWith(Sink.head(), actorSystem) .mapAsync(1)thenApply { response -> AskPattern.ask(when (response) { is supervisorChannelActor, AllCounselorManagers -> response.channels { replyTo: ActorRef<SupervisorChannelResponse>else -> GetAllCounselorManagersemptyList(replyTo) }, } timeout, } } @GetMapping("/list-counselor-managers-async") fun actorSystem.schedulerlistCounselorManagersByCoroutines(): List<String> { val response = AkkaUtils.runBlockingAsk( ) supervisorChannelActor, } { replyTo: ActorRef<SupervisorChannelResponse> -> .runWith(Sink.head(), actorSystem)GetAllCounselorManagers(replyTo) }, .thenApply { response -> timeout, actorSystem ) return when (response) { is AllCounselorManagers -> response.channels else -> emptyList() } } } @GetMapping("/list-counselor-managers-asyncmono") fun listCounselorManagersByCoroutineslistCounselorManagersByMono(): List<String>Mono<List<String>> { val response =return AkkaUtils.runBlockingAskaskActorByMono( supervisorChannelActor, { replyTo: ActorRef<SupervisorChannelResponse> -> GetAllCounselorManagers(replyTo) }, timeout, actorSystem ) .map { response -> return when (response) { is AllCounselorManagers -> response.channels else -> emptyList() } } } |
- 더 줄이고 싶지만 여기까지, 개인적으로 코르틴의 async 방식이 군더더기가 없는듯
Code Block | ||
---|---|---|
| ||
package com.example.kotlinbootlabs.module import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.javadsl.AskPattern import java.time.Duration import kotlinx.coroutines.future.await import kotlinx.coroutines.runBlocking object AkkaUtils { suspend fun <T, R> askActor( actor: ActorRef<T>, message: (ActorRef<R>) -> T, timeout: Duration, actorSystem: ActorSystem<*> ): R { return AskPattern.ask( actor, message, timeout, actorSystem.scheduler() ).await() } fun <T, R> runBlockingAsk( actor: ActorRef<T>, message: (ActorRef<R>) -> T, timeout: Duration, actorSystem: ActorSystem<*> ): R = runBlocking { askActor(actor, message, timeout, actorSystem) } fun <T, R> askActorByMono( actor: ActorRef<T>, message: (ActorRef<R>) -> T, timeout: Duration, actorSystem: ActorSystem<*> ): Mono<R> { return Mono.fromCompletionStage( AskPattern.ask( actor, message, timeout, actorSystem.scheduler() ) ) } } |
- AKKA가 Scala/JAVA만 기본으로 지원하고 Kotlin은 알아서 호환해야하다보니, 자바코드에서 오는 괴리감을 이 유틸 클래스가 처리해주는것으로
...