Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info

리액티브 스트림을 반영해 지능형 AI 서비스 아키텍처를 보강해죠


보너스

Code Block
themeEmacs
LLM 호출 및 AI 모듈과 연결할 때, LLM을 안정적이게 사용할수 있는 backpressure 를 이용하는 방법의 샘플 코드를 akka stream을 이용한 샘플코드를 작성해 스칼라/자바/코틀린 버전으로 각각 작성


✅ 전제 시나리오

  • 외부 LLM API를 호출 (느린 응답 가능성 있음)

  • 요청은 큐 또는 스트림으로 연속적으로 들어옴

  • Akka Stream이 처리 속도를 자동 조절 (Backpressure)


✅ 1. Scala 버전 (Akka Streams)

Code Block
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random

object LlmBackpressureScala extends App {
  implicit val system = ActorSystem("llm-system")
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  // 가상의 LLM API 호출 (느린 호출)
  def callLLM(input: String): Future[String] = Future {
    Thread.sleep(Random.nextInt(1000)) // 임의의 지연
    s"LLM Response for [$input]"
  }

  val stream = Source(1 to 100)
    .throttle(5, 1.second) // 입력 제한
    .map(i => s"input-$i")
    .via(Flow[String].mapAsync(parallelism = 4)(callLLM)) // backpressure 적용
    .runWith(Sink.foreach(resp => println(s"[LLM] $resp")))

  stream.onComplete(_ => system.terminate())
}



✅ 2. Java 버전 (Akka Streams)

Code Block
import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.*;
import scala.concurrent.duration.Duration;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;

public class LlmBackpressureJava {
    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("llm-system");
        ActorMaterializer materializer = ActorMaterializer.create(system);

        Flow<String, String, NotUsed> llmFlow = Flow.of(String.class)
            .mapAsync(4, input -> callLLM(input));

        Source.from(IntStream.rangeClosed(1, 100).boxed().toList())
            .throttle(5, Duration.create(1, "second"))
            .map(i -> "input-" + i)
            .via(llmFlow)
            .runWith(Sink.foreach(resp -> System.out.println("[LLM] " + resp)), materializer)
            .whenComplete((done, exc) -> system.terminate());
    }

    static CompletionStage<String> callLLM(String input) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
            } catch (InterruptedException ignored) {}
            return "LLM Response for [" + input + "]";
        });
    }
}



✅ 3. Kotlin 버전 (Akka Streams + Kotlin DSL)

Code Block
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.javadsl.*
import scala.concurrent.duration.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ThreadLocalRandom

fun main() {
    val system = ActorSystem.create("llm-system")
    val materializer = ActorMaterializer.create(system)

    val llmFlow = Flow.of(String::class.java)
        .mapAsync(4) { input -> callLLM(input) }

    val inputList = (1..100).map { "input-$it" }

    Source.from(inputList)
        .throttle(5, Duration.create(1, "second"))
        .via(llmFlow)
        .runWith(Sink.foreach { println("[LLM] $it") }, materializer)
        .whenComplete { _, _ -> system.terminate() }
}

fun callLLM(input: String): CompletableFuture<String> {
    return CompletableFuture.supplyAsync {
        Thread.sleep(ThreadLocalRandom.current().nextInt(1000))
        "LLM Response for [$input]"
    }
}



✅ 핵심 요점 (적용 시 주의사항)


항목설명
mapAsync(parallelism = N)N개의 요청만 동시에 실행 (backpressure 핵심)
throttle외부 API 보호를 위한 속도 제한
callLLM실제 HTTP 클라이언트로 대체 가능 (WebClient, OkHttp 등)
Sink.foreach결과 소비자 (후처리 로직 삽입 가능)
Code Block
themeEmacs
callLLM 을 HuggingFace API 호출한버전?



Code Block
themeEmacs
def callLLM(input: String): Future[String] = {
  import scala.concurrent.ExecutionContext.Implicits.global
  import sttp.client3._
  import sttp.client3.circe._
  import io.circe._, io.circe.parser._, io.circe.syntax._

  val backend = HttpURLConnectionBackend()
  val token = "<YOUR_HF_TOKEN>"
  val model = "gpt2"

  val bodyJson = Map("inputs" -> input).asJson.noSpaces

  Future {
    val request = basicRequest
      .post(uri"https://api-inference.huggingface.co/models/$model")
      .header("Authorization", s"Bearer $token")
      .body(bodyJson)
      .send(backend)

    val responseBody = request.body match {
      case Right(json) => parse(json).flatMap(_.hcursor.downArray.get[String]("generated_text")).getOrElse("Parse failed")
      case Left(err)   => s"Error: $err"
    }

    responseBody
  }
}


✅ 예시 입력/출력

  • 입력: "Translate this sentence into French: Hello, how are you?"

  • 출력: "Bonjour, comment ça va ?"

✅ 주의사항


항목설명
API 호출 속도HuggingFace는 무료 요금제에서 제한적 TPS 허용
모델 로딩 지연모델 첫 호출 시 cold-start 수초 소요 가능
JSON 파싱일부 모델은 generated_text 대신 다른 필드를 반환할 수 있음
에러 처리JSON parse 오류, 5xx 에러 등에 대비 필요