최근 업데이트된 카페24 API 호출제약 스펙가이드에 따라 액터모델을 이용해 안전하게 호출처리하는 심플 모듈 작성을 

사전에 준비된 스킬모드를 이용해 구현시도를 해보겠습니다.


카페24 API SPEC


공개호스트 - OpenHostService

이 프로토콜은 공개가 되어있고 이용하기위한 문서화가 잘되어 있으며 이것을 이용하는 팀은 상류 API를 그대로 이용을 해야합니다.


페이스북의 공개된 API를 이용하여 로그인 기능을 만들거나 정부 OpenAPI를 이용하여 기능을 개발하는경우

개발팀은 제공되는 API를 그대로 준수하고 따릅니다. 상류 제공 API팀은 전체 의견을 수렴해 기능개선을 할수도 있지만

특정팀의 의견을 위해 기능변경을 하지 않습니다.

순응적 패턴

언제 무엇을 받게될지 상류(Upstream)인 공급자(Provider)가 주로 결정을 하지고 이것을 제공받는 하류(DownStream)팀은 상류 스트림을 준수합니다.  공개호스트와는 다르게 하류팀의 요구사항을 받아들이기도합니다.

Cafe24 API의 경우 대부분 공개호스트이지만, 협약되어 제공되는 API가 있을수 있습니다. 협약관계 또는 상류의 기능이 오류가있거나 개선이 필요할시 공급받는 다운스트림에서 개선요청을 할수도 있지만 대부분 수용해 순응합니다.

more : 컨텍스트 맵핑과 API





API 요청 수 제한 정책

물을 담는 구멍 난 양동이를 상상해보세요.

흐름

  1. 요청이 들어오면 → 버킷에 저장

  2. 버킷은 일정한 속도로만 요청을 처리

  1. 입구에서 유체가 들어옵니다.

  2. 압력이 낮으면 → 밸브가 열려서 정상 흐름 유지

  3. 압력이 높아지면 →

  4. 결과적으로 입구 쪽 압력이 올라가면서 유입이 줄어듭니다



Leaky Bucket은
넘치면 잘라버리는 방식(차단)

Backpressure는
넘치기 전에 속도를 조절하는 방식(제어)

즉,
❌ 버리는 게 아니라
✅ 흐름을 조절합니다.





카페24 API의 호출제약 정책에 의해.. 안전하게 호출할수 있는 장치를 구현하려고 합니다.

# 활용 스킬
다음 스킬을 조회해 활용가능한지 체크후 스킬을 이용해주세요
스킬조회및 인식이 안되는경우 진행하지말고 멈춰주세요
- java-akka-classic
- java-akka-classic-test

# 프로젝트 지침
- 프로젝트 생성 위치 : skill-test/projects/sample22
- 실제 cafe24 API를 호출하지는 않으며...Leaky bucket 이 적용해 제약응답을하는 API를 더미API로 작성합니다. 더미 API는 테스트 장치로 이용될예정으로 로컬호출가능한 API로 작성해주세요
    - API의 본문응답은 요청시 hello 인경우... 응답으로 world를 반환하고... 나머지 단어는 그대로 똑같이 응답으로 돌려주게 작성  
    - skill-test/docs/카페24API호출제약.md : 성능은 헤더값을 통해 응답이 되며 이값을 참고 해 백프레셔의 압력값으로 이용
    - 1노드 테스트이기때문에 Leaky bucket은 적절하게 처리량을 다소 적은수준을 이용합니다. (성능수치 조절가능)
- 백프레셔 기법(AkkaStream장치이용)을 이용해 cafe24 api를 안전하게 호출하는 장치생성 재활용가능한 모델로 작성
  - API를 안전하게 호출해주는 액터모델을 작성.. 이것을 이용하는 대상은 완료된 응답값을 받는것에만 집중을 하며... 호출제약(백프레셔,TPS등)은 관심이 없습니다.
  - AkksStream과 액터모델을 상호 설계해 호출블락킹이아닌 스트림모드(유압장치)를 활용 블락킹을 최소화하는 방법으로 구현합니다. 
    
# 테스트 지침
- 유닛테스트를 필수로 작성, 이 샘플프로젝트가 목적하고자하는 시나리오로 작성하고 검증해주세요 

# 테스트 완료후 문서지침
- skill-test/result/TEST-Cafe24APILimit-RESULT.md : 테스트 수행후 결과 정리
- skill-test/result/TEST-Cafe24APILimit-GUIDE.md : 이 수행결과로 생성된 장치이용에 대한 가이드






# TEST-Cafe24APILimit 테스트 결과

## 테스트 개요

카페24 API의 Leaky Bucket 호출 제약 정책에 대응하여, Akka Streams 백프레셔를 활용한 안전 호출 장치를 구현하고 검증한 결과입니다.

- **프로젝트 위치**: `skill-test/projects/sample22`
- **사용 스킬**: `java-akka-classic`, `java-akka-classic-test`
- **플랫폼**: Java 23 + Akka Classic 2.7.0 + Akka Streams
- **테스트 일자**: 2026-02-28

## 더미 서버 설정 (1노드 테스트)

| 항목 | 값 | 설명 |
|------|-----|------|
| Bucket Capacity | 10 | 최대 동시 요청 수 |
| Leak Rate | 2/sec | 초당 버킷 감소량 |
| 안전 호출 TPS | 2/sec | 스트림 throttle 설정값 |

## 테스트 결과

### 전체: 5건 PASSED / 0건 FAILED

| # | 테스트명 | 결과 | 비고 |
|---|---------|------|------|
| 1 | hello → world 변환 | PASSED | 요청 "hello" → 응답 "world" 정상 |
| 2 | 일반 단어 에코 | PASSED | 요청 "akka-stream" → 응답 "akka-stream" 정상 |
| 3 | 직접호출 429 발생 확인 | PASSED | 20건 동시 → 200: 10건, 429: **10건** |
| 4 | 백프레셔 burst 안전 처리 | PASSED | 15건 burst → 200: **15건**, 429: 0건 |
| 5 | 버킷 상태 헤더 검증 | PASSED | bucketUsed/bucketMax 정상 포함 |

### 핵심 비교: 직접 호출 vs 백프레셔 적용

```
[직접호출 - 백프레셔 미적용]
  총 20건 동시 요청 → 200: 10건, 429: 10건 (50% 실패)
  버킷 capacity(10) 초과로 절반이 거부됨

[백프레셔 적용 - SafeApiCallerActor]
  총 15건 burst 요청 → 200: 15건, 429: 0건 (100% 성공)
  버킷 사용량: 1~3 수준으로 안정적 유지
```

### 백프레셔 적용 시 버킷 사용량 추이

```
safe-0  → bucket=1/10
safe-1  → bucket=2/10
safe-2  → bucket=3/10
safe-3  → bucket=2/10  ← leak으로 감소
safe-4  → bucket=3/10
safe-5  → bucket=2/10  ← leak으로 감소
safe-6  → bucket=3/10
...반복 패턴 (2~3 수준 유지)
```

throttle(2 req/sec)과 서버 leak rate(2/sec)가 균형을 이루어 버킷이 안전 범위(1~3) 내에서 안정적으로 유지됩니다.

## 아키텍처 구성

```
[사용자] → ApiRequest → [SafeApiCallerActor]
                              │
                    ┌─────────▼──────────┐
                    │  Akka Streams 파이프라인  │
                    │                    │
                    │  Source.actorRef    │  ← 유입 밸브 (buffer=100)
                    │       │            │
                    │  throttle(2/s)     │  ← 유량 조절기
                    │       │            │
                    │  mapAsync(1)       │  ← 비동기 HTTP 펌프
                    │  + 적응형 지연      │     (429 자동 재시도)
                    │       │            │
                    │  Sink.foreach      │  ← 응답 배출구
                    └───────│────────────┘
                            ▼
[사용자] ← ApiResponse ← replyTo.tell()
```

## 생성된 파일

| 파일 | 역할 |
|------|------|
| `message/Messages.java` | ApiRequest, ApiResponse, StreamEnvelope, StreamResult 메시지 정의 |
| `api/DummyCafe24Server.java` | Leaky Bucket 더미 API 서버 (JDK HttpServer 기반) |
| `actor/SafeApiCallerActor.java` | 백프레셔 적용 안전 호출 액터 (재사용 가능) |
| `SafeApiCallerTest.java` | 5건 유닛테스트 (TestKit 기반) |




# Cafe24 API 안전 호출 장치 사용 가이드

## 개요

카페24 API의 Leaky Bucket 호출 제약 정책에 대응하는 **Akka Streams 기반 백프레셔 안전 호출 장치**입니다. 사용자는 API 호출 제약(TPS, 429 에러, 버킷 상태)을 전혀 신경 쓰지 않고, 요청을 보내고 결과만 받으면 됩니다.

## 핵심 컴포넌트

### SafeApiCallerActor

호출 제약을 내부적으로 처리하는 재사용 가능한 액터입니다.

```java
// 생성: API 기본 URL + 초당 최대 호출 수 설정
ActorRef caller = system.actorOf(
    SafeApiCallerActor.props("https://api.cafe24.com", 2),
    "cafe24-api-caller"
);

// 요청: 단어와 응답 받을 ActorRef 전달
caller.tell(new ApiRequest("hello", myActorRef), ActorRef.noSender());

// 응답: ApiResponse 메시지로 수신
// - word: 원본 요청 단어
// - result: API 응답 본문
// - statusCode: HTTP 상태 코드
// - bucketUsed/bucketMax: 버킷 상태
```

### 내부 동작 (유압장치 비유)

```
요청 투입 ──→ [버퍼 탱크] ──→ [유량 조절 밸브] ──→ [비동기 펌프] ──→ 응답 배출
  (tell)     (100건 버퍼)   (throttle 2/s)   (mapAsync HTTP)   (replyTo.tell)
                                                    │
                                              [압력 게이지]
                                         (X-Api-Call-Limit 헤더 읽기)
                                                    │
                                          사용률 > 80%: +500ms 지연
                                          사용률 > 50%: +200ms 지연
                                          429 응답: 대기 후 재시도
```

## 사용 방법

### 1. 기본 사용법 (단일 API)

```java
// ActorSystem 초기화
ActorSystem system = ActorSystem.create("my-system");

// 안전 호출 장치 생성 (초당 2건 제한)
ActorRef apiCaller = system.actorOf(
    SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2", 2),
    "product-api"
);

// 요청 보내기 (호출 제약은 장치가 알아서 처리)
apiCaller.tell(new ApiRequest("products/count", self()), self());

// 응답 처리
receive(ApiResponse.class, response -> {
    System.out.println("결과: " + response.result());
    System.out.println("버킷 상태: " + response.bucketUsed() + "/" + response.bucketMax());
});
```

### 2. 다중 API 엔드포인트 (장치 재사용)

```java
// 서로 다른 API 엔드포인트에 독립적인 안전 호출 장치 생성
ActorRef productApi = system.actorOf(
    SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2/products", 2),
    "product-api"
);

ActorRef orderApi = system.actorOf(
    SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2/orders", 2),
    "order-api"
);

// 각 장치는 독립적으로 백프레셔 적용
productApi.tell(new ApiRequest("count", self()), self());
orderApi.tell(new ApiRequest("list", self()), self());
```

### 3. 대량 처리 (burst 안전)

```java
// 100건의 요청을 한번에 투입해도 안전
for (int i = 0; i < 100; i++) {
    apiCaller.tell(new ApiRequest("item-" + i, self()), self());
}
// → throttle이 초당 2건씩 처리
// → 버킷 오버플로 없이 전부 성공
// → 약 50초 후 모든 응답 수신
```

## 성능 튜닝

### throttle 설정 가이드

| Cafe24 버킷 설정 | 권장 throttle | 안전 마진 |
|-----------------|--------------|----------|
| capacity=40, leak=2/s | `maxRequestsPerSecond=2` | 최대 안전 (버킷 거의 미사용) |
| capacity=40, leak=2/s | `maxRequestsPerSecond=3` | 적절 (적응형 지연으로 보완) |
| capacity=40, leak=2/s | `maxRequestsPerSecond=5` | 공격적 (적응형 지연 의존) |

**권장**: `maxRequestsPerSecond <= leak rate`로 설정하면 버킷이 거의 차지 않아 가장 안전합니다.

### 버퍼 크기 조절

```java
// Source.actorRef의 버퍼 크기 (현재 100)
// 대량 burst가 예상되면 증가 가능
Source.<StreamEnvelope>actorRef(500, OverflowStrategy.dropNew())
```

| 버퍼 크기 | 용도 |
|----------|------|
| 100 | 일반 사용 (기본값) |
| 500 | 대량 burst 허용 |
| 1000 | 극대량 처리 |

### OverflowStrategy 선택

| 전략 | 설명 | 권장 상황 |
|------|------|----------|
| `dropNew()` | 버퍼 초과 시 새 요청 폐기 | 최신 요청 우선 (현재 설정) |
| `dropHead()` | 가장 오래된 요청 폐기 | 최신 요청 우선 |
| `backpressure()` | upstream에 압력 전파 | Source.queue 사용 시 |
| `fail()` | 스트림 실패 | 데이터 손실 불허 시 |

## 적응형 백프레셔 상세

SafeApiCallerActor는 API 응답 헤더를 읽어 동적으로 호출 속도를 조절합니다:

```
X-Api-Call-Limit: 8/10  → 사용률 80% → +500ms 지연 추가
X-Api-Call-Limit: 5/10  → 사용률 50% → +200ms 지연 추가
X-Api-Call-Limit: 3/10  → 사용률 30% → 추가 지연 없음
HTTP 429 응답           → X-Cafe24-Call-Remain 초만큼 대기 후 재시도 (최대 3회)
```

이 메커니즘은 throttle의 기본 유량 제한 위에 추가로 작동하여 이중 안전장치 역할을 합니다.

## 실제 Cafe24 API 적용 시 변경 사항

1. **DummyCafe24Server 제거**: 실제 API URL로 교체
2. **인증 헤더 추가**: `HttpRequest.newBuilder().header("Authorization", "Bearer " + token)`
3. **요청/응답 포맷 변경**: Messages.java의 ApiRequest/ApiResponse를 실제 API 스키마에 맞게 확장
4. **throttle 값 조정**: 실제 Cafe24 정책(capacity=40, leak=2/s)에 맞게 설정

```java
// 실제 적용 예시
ActorRef caller = system.actorOf(
    SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2", 2),
    "cafe24-safe-caller"
);
```

## 의존성

```groovy
dependencies {
    implementation "com.typesafe.akka:akka-actor_2.13:2.7.0"
    implementation "com.typesafe.akka:akka-stream_2.13:2.7.0"
}
```



생성된코드

package com.example.cafe24.api;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Cafe24 API를 시뮬레이션하는 더미 서버.
 *
 * <p>Leaky Bucket 알고리즘으로 요청 수를 제한합니다.
 * <ul>
 *   <li>버킷 용량(capacity)을 초과하면 429 Too Many Requests 반환</li>
 *   <li>버킷은 매초 leakRate만큼 자동 감소</li>
 *   <li>응답 헤더: X-Api-Call-Limit, X-Cafe24-Call-Usage, X-Cafe24-Call-Remain</li>
 * </ul>
 *
 * <p>응답 규칙: "hello" 요청 → "world" 반환, 나머지 단어는 그대로 에코.
 */
public class DummyCafe24Server {

    private final HttpServer server;
    private final int bucketCapacity;
    private final int leakRatePerSecond;
    private final AtomicInteger currentBucketLevel = new AtomicInteger(0);
    private final ScheduledExecutorService leakScheduler;

    public DummyCafe24Server(int port, int bucketCapacity, int leakRatePerSecond) throws IOException {
        this.bucketCapacity = bucketCapacity;
        this.leakRatePerSecond = leakRatePerSecond;

        // Leaky Bucket 드레인: 매초 leakRate만큼 버킷 수위 감소
        this.leakScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "leaky-bucket-drain");
            t.setDaemon(true);
            return t;
        });
        this.leakScheduler.scheduleAtFixedRate(
                () -> currentBucketLevel.updateAndGet(c -> Math.max(0, c - leakRatePerSecond)),
                1, 1, TimeUnit.SECONDS
        );

        this.server = HttpServer.create(new InetSocketAddress(port), 0);
        this.server.createContext("/api/echo", this::handleEcho);
        this.server.setExecutor(Executors.newFixedThreadPool(4));
    }

    public void start() {
        server.start();
    }

    public void stop() {
        server.stop(0);
        leakScheduler.shutdown();
    }

    /** 테스트 간 버킷 상태 초기화 */
    public void reset() {
        currentBucketLevel.set(0);
    }

    public int getCurrentBucketLevel() {
        return currentBucketLevel.get();
    }

    public int getBucketCapacity() {
        return bucketCapacity;
    }

    private void handleEcho(HttpExchange exchange) throws IOException {
        if (!"GET".equals(exchange.getRequestMethod())) {
            exchange.sendResponseHeaders(405, -1);
            return;
        }

        int level = currentBucketLevel.incrementAndGet();

        if (level > bucketCapacity) {
            // 버킷 초과 - 429 반환 (실제 소비하지 않으므로 감소)
            currentBucketLevel.decrementAndGet();

            double usagePercent = ((double) level / bucketCapacity) * 100;
            int remainSeconds = (int) Math.ceil((double) (level - bucketCapacity) / leakRatePerSecond) + 1;

            exchange.getResponseHeaders().add("X-Api-Call-Limit", level + "/" + bucketCapacity);
            exchange.getResponseHeaders().add("X-Cafe24-Call-Usage", String.format("%.1f", usagePercent));
            exchange.getResponseHeaders().add("X-Cafe24-Call-Remain", String.valueOf(remainSeconds));

            byte[] body = "Too Many Requests".getBytes(StandardCharsets.UTF_8);
            exchange.sendResponseHeaders(429, body.length);
            try (OutputStream os = exchange.getResponseBody()) {
                os.write(body);
            }
            return;
        }

        // 파라미터 파싱
        String word = parseWordParam(exchange.getRequestURI().getQuery());

        // "hello" → "world", 나머지는 그대로 에코
        String result = "hello".equals(word) ? "world" : word;

        // 응답 헤더 설정
        double usagePercent = ((double) level / bucketCapacity) * 100;
        exchange.getResponseHeaders().add("X-Api-Call-Limit", level + "/" + bucketCapacity);
        exchange.getResponseHeaders().add("X-Cafe24-Call-Usage", String.format("%.1f", usagePercent));
        exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");

        byte[] responseBytes = result.getBytes(StandardCharsets.UTF_8);
        exchange.sendResponseHeaders(200, responseBytes.length);
        try (OutputStream os = exchange.getResponseBody()) {
            os.write(responseBytes);
        }
    }

    private String parseWordParam(String query) {
        if (query == null) return "";
        for (String param : query.split("&")) {
            String[] pair = param.split("=", 2);
            if (pair.length == 2 && "word".equals(pair[0])) {
                return URLDecoder.decode(pair[1], StandardCharsets.UTF_8);
            }
        }
        return "";
    }
}



package com.example.cafe24.actor;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.stream.OverflowStrategy;
import akka.stream.SystemMaterializer;
import akka.stream.ThrottleMode;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.example.cafe24.message.Messages.ApiRequest;
import com.example.cafe24.message.Messages.ApiResponse;
import com.example.cafe24.message.Messages.StreamEnvelope;
import com.example.cafe24.message.Messages.StreamResult;
import scala.concurrent.duration.FiniteDuration;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

/**
 * Cafe24 API를 백프레셔를 적용하여 안전하게 호출하는 액터.
 *
 * <p>내부에 Akka Streams 파이프라인(유압장치)을 구성하여:
 * <ul>
 *   <li>Source.actorRef: 요청 유입구 (밸브)</li>
 *   <li>throttle: 유량 조절기 (초당 maxRequestsPerSecond 제한)</li>
 *   <li>mapAsync: 비동기 HTTP 호출 펌프 (블로킹 없음)</li>
 *   <li>Sink.foreach: 응답 배출구 (원래 요청자에게 전달)</li>
 * </ul>
 *
 * <p>사용자는 {@link ApiRequest}를 보내고 {@link ApiResponse}를 받기만 하면 됩니다.
 * 호출 제약(백프레셔, TPS, 429 재시도)은 이 액터가 모두 처리합니다.
 *
 * <p><b>재사용 방법</b>: apiBaseUrl과 maxRequestsPerSecond를 설정하여
 * 다양한 Cafe24 API 엔드포인트에 대해 독립적인 안전 호출 장치를 생성할 수 있습니다.
 */
public class SafeApiCallerActor extends AbstractActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
    private final String apiBaseUrl;
    private final int maxRequestsPerSecond;
    private final HttpClient httpClient;
    private ActorRef streamEntry;

    public SafeApiCallerActor(String apiBaseUrl, int maxRequestsPerSecond) {
        this.apiBaseUrl = apiBaseUrl;
        this.maxRequestsPerSecond = maxRequestsPerSecond;
        this.httpClient = HttpClient.newBuilder()
                .connectTimeout(Duration.ofSeconds(5))
                .build();
    }

    public static Props props(String apiBaseUrl, int maxRequestsPerSecond) {
        return Props.create(SafeApiCallerActor.class, apiBaseUrl, maxRequestsPerSecond);
    }

    @Override
    public void preStart() {
        var materializer = SystemMaterializer.get(getContext().getSystem()).materializer();

        // Akka Streams 백프레셔 파이프라인 구성
        // Source.actorRef(유입) → throttle(유량제한) → mapAsync(비동기호출) → Sink(응답전달)
        streamEntry = Source.<StreamEnvelope>actorRef(100, OverflowStrategy.dropNew())
                .throttle(
                        maxRequestsPerSecond,
                        FiniteDuration.create(1, TimeUnit.SECONDS),
                        maxRequestsPerSecond,
                        ThrottleMode.shaping()
                )
                .mapAsync(1, this::callApiWithAdaptiveBackpressure)
                .to(Sink.foreach(this::deliverResponse))
                .run(materializer);

        log.info("SafeApiCallerActor 초기화 완료 - baseUrl={}, maxRps={}", apiBaseUrl, maxRequestsPerSecond);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(ApiRequest.class, request -> {
                    // 요청을 스트림 파이프라인에 투입 (논블로킹)
                    streamEntry.tell(new StreamEnvelope(request), getSelf());
                })
                .build();
    }

    /**
     * 적응형 백프레셔를 적용한 API 호출.
     * 응답 헤더의 버킷 사용률에 따라 추가 지연을 동적으로 적용합니다.
     */
    private CompletionStage<StreamResult> callApiWithAdaptiveBackpressure(StreamEnvelope envelope) {
        return executeHttpCall(envelope, 0)
                .exceptionally(ex -> {
                    log.error("API 호출 실패: {}", ex.getMessage());
                    return new StreamResult(
                            envelope.request(),
                            new ApiResponse(envelope.request().word(), "ERROR: " + ex.getMessage(), 500, 0, 0)
                    );
                });
    }

    private CompletionStage<StreamResult> executeHttpCall(StreamEnvelope envelope, int retryCount) {
        String word = envelope.request().word();
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(apiBaseUrl + "/api/echo?word=" + word))
                .timeout(Duration.ofSeconds(5))
                .GET()
                .build();

        return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
                .thenCompose(response -> {
                    int statusCode = response.statusCode();

                    if (statusCode == 429 && retryCount < 3) {
                        // Rate limited - X-Cafe24-Call-Remain 헤더 기반 대기 후 재시도
                        String remainStr = response.headers()
                                .firstValue("X-Cafe24-Call-Remain").orElse("1");
                        int waitSeconds = Integer.parseInt(remainStr);
                        log.warning("429 응답 수신 - {}초 후 재시도 (retry={})", waitSeconds, retryCount + 1);

                        return delay(waitSeconds * 1000L)
                                .thenCompose(v -> executeHttpCall(envelope, retryCount + 1));
                    }

                    // 응답 헤더에서 버킷 상태 파싱
                    String callLimit = response.headers()
                            .firstValue("X-Api-Call-Limit").orElse("0/10");
                    String[] parts = callLimit.split("/");
                    int bucketUsed = Integer.parseInt(parts[0]);
                    int bucketMax = Integer.parseInt(parts[1]);
                    double usageRatio = (double) bucketUsed / bucketMax;

                    StreamResult result = new StreamResult(
                            envelope.request(),
                            new ApiResponse(word, response.body(), statusCode, bucketUsed, bucketMax)
                    );

                    // 적응형 백프레셔: 버킷 사용률 기반 추가 지연
                    // 유압장치의 압력 게이지를 읽고 밸브를 조절하는 것과 같은 원리
                    long adaptiveDelayMs = 0;
                    if (usageRatio > 0.8) {
                        adaptiveDelayMs = 500;
                        log.info("버킷 사용률 {}% - 500ms 추가 지연 적용", (int) (usageRatio * 100));
                    } else if (usageRatio > 0.5) {
                        adaptiveDelayMs = 200;
                    }

                    if (adaptiveDelayMs > 0) {
                        return delay(adaptiveDelayMs).thenApply(v -> result);
                    }
                    return CompletableFuture.completedFuture(result);
                });
    }

    /** 스트림 출구: 완료된 응답을 원래 요청자에게 전달 */
    private void deliverResponse(StreamResult result) {
        ActorRef replyTo = result.originalRequest().replyTo();
        replyTo.tell(result.response(), getSelf());
    }

    /** 논블로킹 지연 (Thread.sleep 대신 delayedExecutor 사용) */
    private CompletionStage<Void> delay(long millis) {
        return CompletableFuture.supplyAsync(
                () -> null,
                CompletableFuture.delayedExecutor(millis, TimeUnit.MILLISECONDS)
        );
    }
}