Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info

리액티브 스트림을 반영해 지능형 AI 서비스 아키텍처를 보강해죠


보너스 - Reactive Stream 버전으로 LLM호출하기

Code Block
themeEmacs
LLM 호출 및 AI 모듈과 연결할 때, LLM을 안정적이게 사용할수 있는 backpressure 를 이용하는 방법의 샘플 코드를 akka stream을 이용한 샘플코드를 작성해 스칼라/자바/코틀린 버전으로 각각 작성


✅ 전제 시나리오

  • 외부 LLM API를 호출 (느린 응답 가능성 있음)

  • 요청은 큐 또는 스트림으로 연속적으로 들어옴

  • Akka Stream이 처리 속도를 자동 조절 (Backpressure)


✅ 1. Scala 버전 (Akka Streams)

Code Block
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random

object LlmBackpressureScala extends App {
  implicit val system = ActorSystem("llm-system")
  implicit val ec = system.dispatcher
  implicit val materializer = ActorMaterializer()

  // 가상의 LLM API 호출 (느린 호출)
  def callLLM(input: String): Future[String] = Future {
    Thread.sleep(Random.nextInt(1000)) // 임의의 지연
    s"LLM Response for [$input]"
  }

  val stream = Source(1 to 100)
    .throttle(5, 1.second) // 입력 제한
    .map(i => s"input-$i")
    .via(Flow[String].mapAsync(parallelism = 4)(callLLM)) // backpressure 적용
    .runWith(Sink.foreach(resp => println(s"[LLM] $resp")))

  stream.onComplete(_ => system.terminate())
}



✅ 2. Java 버전 (Akka Streams)

Code Block
import akka.Done;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.*;
import scala.concurrent.duration.Duration;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;

public class LlmBackpressureJava {
    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("llm-system");
        ActorMaterializer materializer = ActorMaterializer.create(system);

        Flow<String, String, NotUsed> llmFlow = Flow.of(String.class)
            .mapAsync(4, input -> callLLM(input));

        Source.from(IntStream.rangeClosed(1, 100).boxed().toList())
            .throttle(5, Duration.create(1, "second"))
            .map(i -> "input-" + i)
            .via(llmFlow)
            .runWith(Sink.foreach(resp -> System.out.println("[LLM] " + resp)), materializer)
            .whenComplete((done, exc) -> system.terminate());
    }

    static CompletionStage<String> callLLM(String input) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
            } catch (InterruptedException ignored) {}
            return "LLM Response for [" + input + "]";
        });
    }
}



✅ 3. Kotlin 버전 (Akka Streams + Kotlin DSL)

Code Block
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.javadsl.*
import scala.concurrent.duration.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ThreadLocalRandom

fun main() {
    val system = ActorSystem.create("llm-system")
    val materializer = ActorMaterializer.create(system)

    val llmFlow = Flow.of(String::class.java)
        .mapAsync(4) { input -> callLLM(input) }

    val inputList = (1..100).map { "input-$it" }

    Source.from(inputList)
        .throttle(5, Duration.create(1, "second"))
        .via(llmFlow)
        .runWith(Sink.foreach { println("[LLM] $it") }, materializer)
        .whenComplete { _, _ -> system.terminate() }
}

fun callLLM(input: String): CompletableFuture<String> {
    return CompletableFuture.supplyAsync {
        Thread.sleep(ThreadLocalRandom.current().nextInt(1000))
        "LLM Response for [$input]"
    }
}



✅ 핵심 요점 (적용 시 주의사항)


항목설명
mapAsync(parallelism = N)N개의 요청만 동시에 실행 (backpressure 핵심)
throttle외부 API 보호를 위한 속도 제한
callLLM실제 HTTP 클라이언트로 대체 가능 (WebClient, OkHttp 등)
Sink.foreach결과 소비자 (후처리 로직 삽입 가능)
Code Block
themeEmacs
callLLM 을 HuggingFace API 호출한버전?



Code Block
themeEmacs
def callLLM(input: String): Future[String] = {
  import scala.concurrent.ExecutionContext.Implicits.global
  import sttp.client3._
  import sttp.client3.circe._
  import io.circe._, io.circe.parser._, io.circe.syntax._

  val backend = HttpURLConnectionBackend()
  val token = "<YOUR_HF_TOKEN>"
  val model = "gpt2"

  val bodyJson = Map("inputs" -> input).asJson.noSpaces

  Future {
    val request = basicRequest
      .post(uri"https://api-inference.huggingface.co/models/$model")
      .header("Authorization", s"Bearer $token")
      .body(bodyJson)
      .send(backend)

    val responseBody = request.body match {
      case Right(json) => parse(json).flatMap(_.hcursor.downArray.get[String]("generated_text")).getOrElse("Parse failed")
      case Left(err)   => s"Error: $err"
    }

    responseBody
  }
}


✅ 예시 입력/출력

  • 입력: "Translate this sentence into French: Hello, how are you?"

  • 출력: "Bonjour, comment ça va ?"

✅ 주의사항


항목설명
API 호출 속도HuggingFace는 무료 요금제에서 제한적 TPS 허용
모델 로딩 지연모델 첫 호출 시 cold-start 수초 소요 가능
JSON 파싱일부 모델은 generated_text 대신 다른 필드를 반환할 수 있음
에러 처리JSON parse 오류, 5xx 에러 등에 대비 필요
Info

cold-start 에서 점진적으로 사용량을 높여주는것에 대비해 트래픽대비 호출성공률이 높은 backpresure 의 세부적인 전략 코드작성

훌륭한 요구입니다.
LLM API의 cold-start 문제를 고려해, 트래픽이 증가할 때도 안정적인 응답률을 유지하는 backpressure 전략을 구현하려면 다음과 같은 점진 증가 처리 전략이 필요합니다:

...

✅ 요구 목표 요약

  • LLM API는 초기 호출 시 cold-start로 느릴 수 있음 (특히 HuggingFace)

  • 트래픽이 갑자기 몰리면 실패 확률 증가

  • 해결 전략: 점진적 호출 증가 + backpressure + timeout + retry + circuit breaker

...

✅ 전략 구성


전략설명
Dynamic Throttle시작 시 낮은 호출 빈도 → 점진적으로 증가
Backpressure via mapAsync병렬 처리량을 제한
Retry with exponential backoff실패 시 재시도 (단, 부하를 고려한 시간 간격 필요)
Circuit Breaker (optional)일정 실패율 이상이면 일정 시간 동안 차단
Timeout 설정응답 지연으로 인해 전체 흐름이 멈추지 않도록 설정


✅ 예제 코드 (Scala + Akka Streams 기준)

Code Block
themeEmacs
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.pattern.after
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.Random

object LlmBackpressureStrategy extends App {
  implicit val system = ActorSystem("llm-coldstart")
  implicit val ec: ExecutionContext = system.dispatcher
  implicit val mat = ActorMaterializer()

  val totalRequests = 100
  val baseDelay = 500.millis

  // 동적으로 호출 빈도를 점진적으로 증가시키는 Source
  val rampUpSource = Source(1 to totalRequests)
    .throttle(1, baseDelay, 1, ThrottleMode.Shaping) // 초기에는 1개/0.5초
    .via(rampUpFlow(10, 100.millis)) // 점진적으로 빠르게

  // 점진적 호출 속도 증가를 위한 커스텀 Flow
  def rampUpFlow(steps: Int, stepInterval: FiniteDuration): Flow[Int, Int, _] = {
    Flow[Int].throttle(1, stepInterval).conflateWithSeed(i => (i, 1)) {
      case ((_, cnt), newValue) => (newValue, cnt + 1)
    }.map(_._1)
  }

  // LLM API 호출 (시뮬레이션)
  def callLLM(input: String): Future[String] = {
    val delay = if (Random.nextDouble() < 0.2) 3000 else 300 // 일부 cold start 발생
    Future {
      Thread.sleep(delay)
      if (Random.nextDouble() < 0.1) throw new RuntimeException("Simulated failure")
      s"Response for [$input] after $delay ms"
    }
  }

  // Timeout 처리 + Retry
  def withRetryAndTimeout(input: String, retries: Int): Future[String] = {
    val timeoutFuture = after(2.seconds, system.scheduler)(Future.failed(new TimeoutException("Timeout")))
    val attempt = Future.firstCompletedOf(Seq(callLLM(input), timeoutFuture))

    attempt.recoverWith {
      case _ if retries > 0 =>
        println(s"[Retrying] $input")
        after(300.millis, system.scheduler)(withRetryAndTimeout(input, retries - 1))
    }
  }

  rampUpSource
    .map(i => s"input-$i")
    .via(Flow[String].mapAsync(parallelism = 4)(input => withRetryAndTimeout(input, 2)))
    .runWith(Sink.foreach(result => println(s"[OK] $result")))
    .onComplete { _ =>
      println("Stream finished.")
      system.terminate()
    }
}


✅ 주요 설명


요소설명
throttle + conflate초기에는 느리게, 점진적으로 호출 속도 증가
mapAsync(parallelism = N)N개 동시 호출 제한 = backpressure
withRetryAndTimeout실패/지연에 대비해 timeout + retry 구성
Future.firstCompletedOf일정 시간 안에 응답 없으면 timeout 처리
after(delay)재시도/딜레이를 위한 akka.pattern.after 사용

...

✅ 확장 전략

  • Circuit Breaker:
    Akka CircuitBreaker로 일정 실패율 이상이면 일정 시간 동안 차단 가능

  • 메트릭 기반 트래픽 조정:
    Prometheus/StatsD로 평균 응답시간 모니터링 → throttle() 속도 조정