Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • 프로덕션 그레이드 플랫폼이 필요하며, AI 패턴을 구현하기 위해 서로 다른 도구를 사용할 수 있어야 한다. [301]

  • 실패를 수용하고, 실패를 모델링해야 하며, 플랫폼이 해결해야 할 회복력의 수준을 고려해야 한다. [304]

  • 24-48시간 내에 개념 증명을 제공하며, 흥미로운 사례가 있다면 연락하라고 권장한다. [307]

  • 백프레셔는 사용하는 스트리밍 라이브러리 내에서 구축되며, 여러 상태 기반 클라이언트를 통해 요청을 큐잉할 수 있다. [312]

  • LLM이 응답을 바탕으로 호출해야 할 도구를 오케스트레이션하며, 이를 통해 동적 실행이 가능하다. [347]



보너스

이 동영상의 내용중 Stream을 이용하는 부분만 캡쳐해 코드생성을 시도해보았습니다. 

다양한 스트림장치와 연동되는 AkkaStream 구현체의 느낌정도로 살펴보면 되겠습니다.

업로드한 이미지Image Added


Info

akka stream을 이용 grpc와 상호연동하면서 llm을 이용하는 샘플코드작성


🧩 목표 아키텍처

Code Block
themeEmacs
gRPC Server (LLM Proxy)
    ▲
    │   ← Akka Stream (Source.queue → Flow → Sink)
    │
Pekko Actor (LLMProcessorActor)
    ▲
    │   ← HelloActor / Router 에서 메시지 전달
    │
Client (gRPC 요청 송신자)


1. gRPC 프로토 정의 (예: llm.proto)

Code Block
themeEmacs
syntax = "proto3";

service LLMService {
  rpc Ask (LLMRequest) returns (LLMReply);
}

message LLMRequest {
  string prompt = 1;
}

message LLMReply {
  string response = 1;
}


2. LLM Stream Client 구성 (LLMClientStream.kt)


Code Block
themeEmacs
package org.example.grpc

import org.apache.pekko.stream.javadsl.*
import org.apache.pekko.stream.OverflowStrategy
import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.Done
import java.util.concurrent.CompletionStage

class LLMClientStream(system: ActorSystem<*>, private val grpcStub: LLMServiceStub) {

    private val materializer = Materializer.createMaterializer(system)
    private val sourceQueue =
        Source.queue<LLMRequest>(10, OverflowStrategy.backpressure())
            .via(Flow.fromFunction { req: LLMRequest ->
                grpcStub.ask(req).toCompletableFuture().get()
            })
            .toMat(Sink.foreach { reply ->
                println(">> LLM 응답: ${reply.response}")
            }, Keep.left())
            .run(materializer)

    fun askLLM(prompt: String): CompletionStage<Done> {
        return sourceQueue.offer(LLMRequest.newBuilder().setPrompt(prompt).build())
    }
}



3. Actor에서 gRPC 스트림 호출 (LLMProcessorActor.kt)

Code Block
themeEmacs
package org.example.actor.llm

import org.apache.pekko.actor.typed.*
import org.apache.pekko.actor.typed.javadsl.*
import org.example.grpc.LLMClientStream

sealed class LLMCommand
data class AskPrompt(val prompt: String) : LLMCommand()

class LLMProcessorActor(
    context: ActorContext<LLMCommand>,
    private val llmClient: LLMClientStream
) : AbstractBehavior<LLMCommand>(context) {

    companion object {
        fun create(llmClient: LLMClientStream): Behavior<LLMCommand> {
            return Behaviors.setup { ctx -> LLMProcessorActor(ctx, llmClient) }
        }
    }

    override fun createReceive(): Receive<LLMCommand> {
        return newReceiveBuilder()
            .onMessage(AskPrompt::class.java) { onAskPrompt(it) }
            .build()
    }

    private fun onAskPrompt(command: AskPrompt): Behavior<LLMCommand> {
        llmClient.askLLM(command.prompt)
        return this
    }
}


4. 예시 Actor에서 LLM 호출 (HelloActor.kt 연동)

Code Block
themeEmacs
helloActor.tell(AskPrompt("Explain actor model in 2 lines"))


5. gRPC 서버는 Bufstream 혹은 일반 gRPC 서버로 구동 (Bufstream 호환 구성 가능)

...

✅ 특징 요약

  • Akka Streams + gRPC 조합으로 gRPC 요청을 backpressure-aware하게 처리.

  • LLMProcessorActorqueue에 LLM 요청을 넣고 결과는 비동기 처리.

  • HelloActor 혹은 RouterActor에서 AskPrompt 메시지로 연동 가능.

  • Bufstream과도 동일한 원리로 적용 가능 (gRPC 기반이므로 호환).