Page History
...
| Info |
|---|
리액티브 스트림을 반영해 지능형 AI 서비스 아키텍처를 보강해죠 |
보너스 - Reactive Stream 버전으로 LLM호출하기
| Code Block | ||
|---|---|---|
| ||
LLM 호출 및 AI 모듈과 연결할 때, LLM을 안정적이게 사용할수 있는 backpressure 를 이용하는 방법의 샘플 코드를 akka stream을 이용한 샘플코드를 작성해 스칼라/자바/코틀린 버전으로 각각 작성 |
...
| Code Block |
|---|
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.javadsl.*
import scala.concurrent.duration.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ThreadLocalRandom
fun main() {
val system = ActorSystem.create("llm-system")
val materializer = ActorMaterializer.create(system)
val llmFlow = Flow.of(String::class.java)
.mapAsync(4) { input -> callLLM(input) }
val inputList = (1..100).map { "input-$it" }
Source.from(inputList)
.throttle(5, Duration.create(1, "second"))
.via(llmFlow)
.runWith(Sink.foreach { println("[LLM] $it") }, materializer)
.whenComplete { _, _ -> system.terminate() }
}
fun callLLM(input: String): CompletableFuture<String> {
return CompletableFuture.supplyAsync {
Thread.sleep(ThreadLocalRandom.current().nextInt(1000))
"LLM Response for [$input]"
}
}
|
✅ 핵심 요점 (적용 시 주의사항)
| 항목 | 설명 |
|---|---|
mapAsync(parallelism = N) | N개의 요청만 동시에 실행 (backpressure 핵심) |
throttle | 외부 API 보호를 위한 속도 제한 |
callLLM | 실제 HTTP 클라이언트로 대체 가능 (WebClient, OkHttp 등) |
Sink.foreach | 결과 소비자 (후처리 로직 삽입 가능) |
| Code Block | ||
|---|---|---|
| ||
callLLM 을 HuggingFace API 호출한버전? |
...
입력:
"Translate this sentence into French: Hello, how are you?"출력:
"Bonjour, comment ça va ?"
✅ 주의사항
| 항목 | 설명 |
|---|---|
| API 호출 속도 | HuggingFace는 무료 요금제에서 제한적 TPS 허용 |
| 모델 로딩 지연 | 모델 첫 호출 시 cold-start 수초 소요 가능 |
| JSON 파싱 | 일부 모델은 generated_text 대신 다른 필드를 반환할 수 있음 |
| 에러 처리 | JSON parse 오류, 5xx 에러 등에 대비 필요 |
| Info |
|---|
cold-start 에서 점진적으로 사용량을 높여주는것에 대비해 트래픽대비 호출성공률이 높은 backpresure 의 세부적인 전략 코드작성 |
훌륭한 요구입니다.
LLM API의 cold-start 문제를 고려해, 트래픽이 증가할 때도 안정적인 응답률을 유지하는 backpressure 전략을 구현하려면 다음과 같은 점진 증가 처리 전략이 필요합니다:
...
✅ 요구 목표 요약
LLM API는 초기 호출 시 cold-start로 느릴 수 있음 (특히 HuggingFace)
트래픽이 갑자기 몰리면 실패 확률 증가
해결 전략: 점진적 호출 증가 + backpressure + timeout + retry + circuit breaker
...
✅ 전략 구성
| 전략 | 설명 |
|---|---|
| Dynamic Throttle | 시작 시 낮은 호출 빈도 → 점진적으로 증가 |
Backpressure via mapAsync | 병렬 처리량을 제한 |
| Retry with exponential backoff | 실패 시 재시도 (단, 부하를 고려한 시간 간격 필요) |
| Circuit Breaker (optional) | 일정 실패율 이상이면 일정 시간 동안 차단 |
| Timeout 설정 | 응답 지연으로 인해 전체 흐름이 멈추지 않도록 설정 |
✅ 예제 코드 (Scala + Akka Streams 기준)
| Code Block | ||
|---|---|---|
| ||
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.pattern.after
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.util.Random
object LlmBackpressureStrategy extends App {
implicit val system = ActorSystem("llm-coldstart")
implicit val ec: ExecutionContext = system.dispatcher
implicit val mat = ActorMaterializer()
val totalRequests = 100
val baseDelay = 500.millis
// 동적으로 호출 빈도를 점진적으로 증가시키는 Source
val rampUpSource = Source(1 to totalRequests)
.throttle(1, baseDelay, 1, ThrottleMode.Shaping) // 초기에는 1개/0.5초
.via(rampUpFlow(10, 100.millis)) // 점진적으로 빠르게
// 점진적 호출 속도 증가를 위한 커스텀 Flow
def rampUpFlow(steps: Int, stepInterval: FiniteDuration): Flow[Int, Int, _] = {
Flow[Int].throttle(1, stepInterval).conflateWithSeed(i => (i, 1)) {
case ((_, cnt), newValue) => (newValue, cnt + 1)
}.map(_._1)
}
// LLM API 호출 (시뮬레이션)
def callLLM(input: String): Future[String] = {
val delay = if (Random.nextDouble() < 0.2) 3000 else 300 // 일부 cold start 발생
Future {
Thread.sleep(delay)
if (Random.nextDouble() < 0.1) throw new RuntimeException("Simulated failure")
s"Response for [$input] after $delay ms"
}
}
// Timeout 처리 + Retry
def withRetryAndTimeout(input: String, retries: Int): Future[String] = {
val timeoutFuture = after(2.seconds, system.scheduler)(Future.failed(new TimeoutException("Timeout")))
val attempt = Future.firstCompletedOf(Seq(callLLM(input), timeoutFuture))
attempt.recoverWith {
case _ if retries > 0 =>
println(s"[Retrying] $input")
after(300.millis, system.scheduler)(withRetryAndTimeout(input, retries - 1))
}
}
rampUpSource
.map(i => s"input-$i")
.via(Flow[String].mapAsync(parallelism = 4)(input => withRetryAndTimeout(input, 2)))
.runWith(Sink.foreach(result => println(s"[OK] $result")))
.onComplete { _ =>
println("Stream finished.")
system.terminate()
}
}
|
✅ 주요 설명
| 요소 | 설명 |
|---|---|
throttle + conflate | 초기에는 느리게, 점진적으로 호출 속도 증가 |
mapAsync(parallelism = N) | N개 동시 호출 제한 = backpressure |
withRetryAndTimeout | 실패/지연에 대비해 timeout + retry 구성 |
Future.firstCompletedOf | 일정 시간 안에 응답 없으면 timeout 처리 |
after(delay) | 재시도/딜레이를 위한 akka.pattern.after 사용 |
...
✅ 확장 전략
Circuit Breaker:
Akka CircuitBreaker로 일정 실패율 이상이면 일정 시간 동안 차단 가능메트릭 기반 트래픽 조정:
Prometheus/StatsD로 평균 응답시간 모니터링 →throttle()속도 조정
