바이브를 돌리기전 무조건 수행해죠가 아닌~ 제약 알고리즘및 대응해야하는 패턴조사를 어느정도 준비해두면 도움될수 있습니다.
API 요청 수 제한 정책
카페24는 API 요청을 Bucket에 쌓아두었다가, Bucket이 가득 차면 요청을 제한하는 방식인 "Leaky Bucket"알고리즘을 사용하고 있습니다.
물을 담는 구멍 난 양동이를 상상해보세요.
🧑💻 API 요청 = 물
🪣 버킷 = 요청 저장 공간
🕳️ 구멍 = 일정 속도로 빠져나가는 처리 속도
흐름
요청이 들어오면 → 버킷에 저장
버킷은 일정한 속도로만 요청을 처리
입구에서 유체가 들어옵니다.
압력이 낮으면 → 밸브가 열려서 정상 흐름 유지
압력이 높아지면 →
스프링을 밀어 올리면서
밸브가 닫히기 시작
결과적으로 입구 쪽 압력이 올라가면서 유입이 줄어듭니다
Leaky Bucket은 → 넘치면 잘라버리는 방식(차단)
Backpressure는 → 넘치기 전에 속도를 조절하는 방식(제어)
즉, ❌ 버리는 게 아니라 ✅ 흐름을 조절합니다.
즉 카페24 API는 다운스트림이 호출을 많이 못하게 차단을 하고 압력수치에 사용할 힌트값을 알려줍니다.
다운스트림 파트는 상류가 알려주는 압력수치를 이용해 백프레셔의 압력값을 이용해 호출 조절기를 만드는 전략입니다.
프롬프트 - 플랜
카페24 API의 호출제약 정책에 의해.. 안전하게 호출할수 있는 장치를 구현하려고 합니다.
# 활용 스킬
다음 스킬을 조회해 활용가능한지 체크후 스킬을 이용해주세요
스킬조회및 인식이 안되는경우 진행하지말고 멈춰주세요
- java-akka-classic
- java-akka-classic-test
# 프로젝트 지침
- 프로젝트 생성 위치 : skill-test/projects/sample22
- 실제 cafe24 API를 호출하지는 않으며...Leaky bucket 이 적용해 제약응답을하는 API를 더미API로 작성합니다. 더미 API는 테스트 장치로 이용될예정으로 로컬호출가능한 API로 작성해주세요
- API의 본문응답은 요청시 hello 인경우... 응답으로 world를 반환하고... 나머지 단어는 그대로 똑같이 응답으로 돌려주게 작성
- skill-test/docs/카페24API호출제약.md : 성능은 헤더값을 통해 응답이 되며 이값을 참고 해 백프레셔의 압력값으로 이용
- 1노드 테스트이기때문에 Leaky bucket은 적절하게 처리량을 다소 적은수준을 이용합니다. (성능수치 조절가능)
- 백프레셔 기법(AkkaStream장치이용)을 이용해 cafe24 api를 안전하게 호출하는 장치생성 재활용가능한 모델로 작성
- API를 안전하게 호출해주는 액터모델을 작성.. 이것을 이용하는 대상은 완료된 응답값을 받는것에만 집중을 하며... 호출제약(백프레셔,TPS등)은 관심이 없습니다.
- AkksStream과 액터모델을 상호 설계해 호출블락킹이아닌 스트림모드(유압장치)를 활용 블락킹을 최소화하는 방법으로 구현합니다.
# 테스트 지침
- 유닛테스트를 필수로 작성, 이 샘플프로젝트가 목적하고자하는 시나리오로 작성하고 검증해주세요
# 테스트 완료후 문서지침
- skill-test/result/TEST-Cafe24APILimit-RESULT.md : 테스트 수행후 결과 정리
- skill-test/result/TEST-Cafe24APILimit-GUIDE.md : 이 수행결과로 생성된 장치이용에 대한 가이드
스킬에는 스트림처리및 TPS제어등 동시성 제어를 위한 다양한 기능이 이미 포함되어 있어서 최초 수행 플랜프롬프트에서는 핵심 도메인규칙을 위주로만 설명가능합니다.
액터 스킬모드
액터스킬은 자바/코틀린/닷넷 3가지 진영에 모드 사용할수 있으며 스킬스토어를 통해 간편 설치가 가능합니다.
# TEST-Cafe24APILimit 테스트 결과
## 테스트 개요
카페24 API의 Leaky Bucket 호출 제약 정책에 대응하여, Akka Streams 백프레셔를 활용한 안전 호출 장치를 구현하고 검증한 결과입니다.
- **프로젝트 위치**: `skill-test/projects/sample22`
- **사용 스킬**: `java-akka-classic`, `java-akka-classic-test`
- **플랫폼**: Java 23 + Akka Classic 2.7.0 + Akka Streams
- **테스트 일자**: 2026-02-28
## 더미 서버 설정 (1노드 테스트)
| 항목 | 값 | 설명 |
|------|-----|------|
| Bucket Capacity | 10 | 최대 동시 요청 수 |
| Leak Rate | 2/sec | 초당 버킷 감소량 |
| 안전 호출 TPS | 2/sec | 스트림 throttle 설정값 |
## 테스트 결과
### 전체: 5건 PASSED / 0건 FAILED
| # | 테스트명 | 결과 | 비고 |
|---|---------|------|------|
| 1 | hello → world 변환 | PASSED | 요청 "hello" → 응답 "world" 정상 |
| 2 | 일반 단어 에코 | PASSED | 요청 "akka-stream" → 응답 "akka-stream" 정상 |
| 3 | 직접호출 429 발생 확인 | PASSED | 20건 동시 → 200: 10건, 429: **10건** |
| 4 | 백프레셔 burst 안전 처리 | PASSED | 15건 burst → 200: **15건**, 429: 0건 |
| 5 | 버킷 상태 헤더 검증 | PASSED | bucketUsed/bucketMax 정상 포함 |
### 핵심 비교: 직접 호출 vs 백프레셔 적용
```
[직접호출 - 백프레셔 미적용]
총 20건 동시 요청 → 200: 10건, 429: 10건 (50% 실패)
버킷 capacity(10) 초과로 절반이 거부됨
[백프레셔 적용 - SafeApiCallerActor]
총 15건 burst 요청 → 200: 15건, 429: 0건 (100% 성공)
버킷 사용량: 1~3 수준으로 안정적 유지
```
### 백프레셔 적용 시 버킷 사용량 추이
```
safe-0 → bucket=1/10
safe-1 → bucket=2/10
safe-2 → bucket=3/10
safe-3 → bucket=2/10 ← leak으로 감소
safe-4 → bucket=3/10
safe-5 → bucket=2/10 ← leak으로 감소
safe-6 → bucket=3/10
...반복 패턴 (2~3 수준 유지)
```
throttle(2 req/sec)과 서버 leak rate(2/sec)가 균형을 이루어 버킷이 안전 범위(1~3) 내에서 안정적으로 유지됩니다.
## 아키텍처 구성
```
[사용자] → ApiRequest → [SafeApiCallerActor]
│
┌─────────▼──────────┐
│ Akka Streams 파이프라인 │
│ │
│ Source.actorRef │ ← 유입 밸브 (buffer=100)
│ │ │
│ throttle(2/s) │ ← 유량 조절기
│ │ │
│ mapAsync(1) │ ← 비동기 HTTP 펌프
│ + 적응형 지연 │ (429 자동 재시도)
│ │ │
│ Sink.foreach │ ← 응답 배출구
└───────│────────────┘
▼
[사용자] ← ApiResponse ← replyTo.tell()
```
## 생성된 파일
| 파일 | 역할 |
|------|------|
| `message/Messages.java` | ApiRequest, ApiResponse, StreamEnvelope, StreamResult 메시지 정의 |
| `api/DummyCafe24Server.java` | Leaky Bucket 더미 API 서버 (JDK HttpServer 기반) |
| `actor/SafeApiCallerActor.java` | 백프레셔 적용 안전 호출 액터 (재사용 가능) |
| `SafeApiCallerTest.java` | 5건 유닛테스트 (TestKit 기반) |
장치 이용가이드
# Cafe24 API 안전 호출 장치 사용 가이드
## 개요
카페24 API의 Leaky Bucket 호출 제약 정책에 대응하는 **Akka Streams 기반 백프레셔 안전 호출 장치**입니다. 사용자는 API 호출 제약(TPS, 429 에러, 버킷 상태)을 전혀 신경 쓰지 않고, 요청을 보내고 결과만 받으면 됩니다.
## 핵심 컴포넌트
### SafeApiCallerActor
호출 제약을 내부적으로 처리하는 재사용 가능한 액터입니다.
```java
// 생성: API 기본 URL + 초당 최대 호출 수 설정
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("https://api.cafe24.com", 2),
"cafe24-api-caller"
);
// 요청: 단어와 응답 받을 ActorRef 전달
caller.tell(new ApiRequest("hello", myActorRef), ActorRef.noSender());
// 응답: ApiResponse 메시지로 수신
// - word: 원본 요청 단어
// - result: API 응답 본문
// - statusCode: HTTP 상태 코드
// - bucketUsed/bucketMax: 버킷 상태
```
### 내부 동작 (유압장치 비유)
```
요청 투입 ──→ [버퍼 탱크] ──→ [유량 조절 밸브] ──→ [비동기 펌프] ──→ 응답 배출
(tell) (100건 버퍼) (throttle 2/s) (mapAsync HTTP) (replyTo.tell)
│
[압력 게이지]
(X-Api-Call-Limit 헤더 읽기)
│
사용률 > 80%: +500ms 지연
사용률 > 50%: +200ms 지연
429 응답: 대기 후 재시도
```
## 사용 방법
### 1. 기본 사용법 (단일 API)
```java
// ActorSystem 초기화
ActorSystem system = ActorSystem.create("my-system");
// 안전 호출 장치 생성 (초당 2건 제한)
ActorRef apiCaller = system.actorOf(
SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2", 2),
"product-api"
);
// 요청 보내기 (호출 제약은 장치가 알아서 처리)
apiCaller.tell(new ApiRequest("products/count", self()), self());
// 응답 처리
receive(ApiResponse.class, response -> {
System.out.println("결과: " + response.result());
System.out.println("버킷 상태: " + response.bucketUsed() + "/" + response.bucketMax());
});
```
### 2. 다중 API 엔드포인트 (장치 재사용)
```java
// 서로 다른 API 엔드포인트에 독립적인 안전 호출 장치 생성
ActorRef productApi = system.actorOf(
SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2/products", 2),
"product-api"
);
ActorRef orderApi = system.actorOf(
SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2/orders", 2),
"order-api"
);
// 각 장치는 독립적으로 백프레셔 적용
productApi.tell(new ApiRequest("count", self()), self());
orderApi.tell(new ApiRequest("list", self()), self());
```
### 3. 대량 처리 (burst 안전)
```java
// 100건의 요청을 한번에 투입해도 안전
for (int i = 0; i < 100; i++) {
apiCaller.tell(new ApiRequest("item-" + i, self()), self());
}
// → throttle이 초당 2건씩 처리
// → 버킷 오버플로 없이 전부 성공
// → 약 50초 후 모든 응답 수신
```
## 성능 튜닝
### throttle 설정 가이드
| Cafe24 버킷 설정 | 권장 throttle | 안전 마진 |
|-----------------|--------------|----------|
| capacity=40, leak=2/s | `maxRequestsPerSecond=2` | 최대 안전 (버킷 거의 미사용) |
| capacity=40, leak=2/s | `maxRequestsPerSecond=3` | 적절 (적응형 지연으로 보완) |
| capacity=40, leak=2/s | `maxRequestsPerSecond=5` | 공격적 (적응형 지연 의존) |
**권장**: `maxRequestsPerSecond <= leak rate`로 설정하면 버킷이 거의 차지 않아 가장 안전합니다.
### 버퍼 크기 조절
```java
// Source.actorRef의 버퍼 크기 (현재 100)
// 대량 burst가 예상되면 증가 가능
Source.<StreamEnvelope>actorRef(500, OverflowStrategy.dropNew())
```
| 버퍼 크기 | 용도 |
|----------|------|
| 100 | 일반 사용 (기본값) |
| 500 | 대량 burst 허용 |
| 1000 | 극대량 처리 |
### OverflowStrategy 선택
| 전략 | 설명 | 권장 상황 |
|------|------|----------|
| `dropNew()` | 버퍼 초과 시 새 요청 폐기 | 최신 요청 우선 (현재 설정) |
| `dropHead()` | 가장 오래된 요청 폐기 | 최신 요청 우선 |
| `backpressure()` | upstream에 압력 전파 | Source.queue 사용 시 |
| `fail()` | 스트림 실패 | 데이터 손실 불허 시 |
## 적응형 백프레셔 상세
SafeApiCallerActor는 API 응답 헤더를 읽어 동적으로 호출 속도를 조절합니다:
```
X-Api-Call-Limit: 8/10 → 사용률 80% → +500ms 지연 추가
X-Api-Call-Limit: 5/10 → 사용률 50% → +200ms 지연 추가
X-Api-Call-Limit: 3/10 → 사용률 30% → 추가 지연 없음
HTTP 429 응답 → X-Cafe24-Call-Remain 초만큼 대기 후 재시도 (최대 3회)
```
이 메커니즘은 throttle의 기본 유량 제한 위에 추가로 작동하여 이중 안전장치 역할을 합니다.
## 실제 Cafe24 API 적용 시 변경 사항
1. **DummyCafe24Server 제거**: 실제 API URL로 교체
2. **인증 헤더 추가**: `HttpRequest.newBuilder().header("Authorization", "Bearer " + token)`
3. **요청/응답 포맷 변경**: Messages.java의 ApiRequest/ApiResponse를 실제 API 스키마에 맞게 확장
4. **throttle 값 조정**: 실제 Cafe24 정책(capacity=40, leak=2/s)에 맞게 설정
```java
// 실제 적용 예시
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2", 2),
"cafe24-safe-caller"
);
```
## 의존성
```groovy
dependencies {
implementation "com.typesafe.akka:akka-actor_2.13:2.7.0"
implementation "com.typesafe.akka:akka-stream_2.13:2.7.0"
}
```
생성된 핵심 코드
카페24 더미 API
package com.example.cafe24.api;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Cafe24 API를 시뮬레이션하는 더미 서버.
*
* <p>Leaky Bucket 알고리즘으로 요청 수를 제한합니다.
* <ul>
* <li>버킷 용량(capacity)을 초과하면 429 Too Many Requests 반환</li>
* <li>버킷은 매초 leakRate만큼 자동 감소</li>
* <li>응답 헤더: X-Api-Call-Limit, X-Cafe24-Call-Usage, X-Cafe24-Call-Remain</li>
* </ul>
*
* <p>응답 규칙: "hello" 요청 → "world" 반환, 나머지 단어는 그대로 에코.
*/
public class DummyCafe24Server {
private final HttpServer server;
private final int bucketCapacity;
private final int leakRatePerSecond;
private final AtomicInteger currentBucketLevel = new AtomicInteger(0);
private final ScheduledExecutorService leakScheduler;
public DummyCafe24Server(int port, int bucketCapacity, int leakRatePerSecond) throws IOException {
this.bucketCapacity = bucketCapacity;
this.leakRatePerSecond = leakRatePerSecond;
// Leaky Bucket 드레인: 매초 leakRate만큼 버킷 수위 감소
this.leakScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "leaky-bucket-drain");
t.setDaemon(true);
return t;
});
this.leakScheduler.scheduleAtFixedRate(
() -> currentBucketLevel.updateAndGet(c -> Math.max(0, c - leakRatePerSecond)),
1, 1, TimeUnit.SECONDS
);
this.server = HttpServer.create(new InetSocketAddress(port), 0);
this.server.createContext("/api/echo", this::handleEcho);
this.server.setExecutor(Executors.newFixedThreadPool(4));
}
public void start() {
server.start();
}
public void stop() {
server.stop(0);
leakScheduler.shutdown();
}
/** 테스트 간 버킷 상태 초기화 */
public void reset() {
currentBucketLevel.set(0);
}
public int getCurrentBucketLevel() {
return currentBucketLevel.get();
}
public int getBucketCapacity() {
return bucketCapacity;
}
private void handleEcho(HttpExchange exchange) throws IOException {
if (!"GET".equals(exchange.getRequestMethod())) {
exchange.sendResponseHeaders(405, -1);
return;
}
int level = currentBucketLevel.incrementAndGet();
if (level > bucketCapacity) {
// 버킷 초과 - 429 반환 (실제 소비하지 않으므로 감소)
currentBucketLevel.decrementAndGet();
double usagePercent = ((double) level / bucketCapacity) * 100;
int remainSeconds = (int) Math.ceil((double) (level - bucketCapacity) / leakRatePerSecond) + 1;
exchange.getResponseHeaders().add("X-Api-Call-Limit", level + "/" + bucketCapacity);
exchange.getResponseHeaders().add("X-Cafe24-Call-Usage", String.format("%.1f", usagePercent));
exchange.getResponseHeaders().add("X-Cafe24-Call-Remain", String.valueOf(remainSeconds));
byte[] body = "Too Many Requests".getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(429, body.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(body);
}
return;
}
// 파라미터 파싱
String word = parseWordParam(exchange.getRequestURI().getQuery());
// "hello" → "world", 나머지는 그대로 에코
String result = "hello".equals(word) ? "world" : word;
// 응답 헤더 설정
double usagePercent = ((double) level / bucketCapacity) * 100;
exchange.getResponseHeaders().add("X-Api-Call-Limit", level + "/" + bucketCapacity);
exchange.getResponseHeaders().add("X-Cafe24-Call-Usage", String.format("%.1f", usagePercent));
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
byte[] responseBytes = result.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(200, responseBytes.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseBytes);
}
}
private String parseWordParam(String query) {
if (query == null) return "";
for (String param : query.split("&")) {
String[] pair = param.split("=", 2);
if (pair.length == 2 && "word".equals(pair[0])) {
return URLDecoder.decode(pair[1], StandardCharsets.UTF_8);
}
}
return "";
}
}
백프레셔가 포함된 안전호출 액터모델
package com.example.cafe24.actor;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.stream.OverflowStrategy;
import akka.stream.SystemMaterializer;
import akka.stream.ThrottleMode;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.example.cafe24.message.Messages.ApiRequest;
import com.example.cafe24.message.Messages.ApiResponse;
import com.example.cafe24.message.Messages.StreamEnvelope;
import com.example.cafe24.message.Messages.StreamResult;
import scala.concurrent.duration.FiniteDuration;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* Cafe24 API를 백프레셔를 적용하여 안전하게 호출하는 액터.
*
* <p>내부에 Akka Streams 파이프라인(유압장치)을 구성하여:
* <ul>
* <li>Source.actorRef: 요청 유입구 (밸브)</li>
* <li>throttle: 유량 조절기 (초당 maxRequestsPerSecond 제한)</li>
* <li>mapAsync: 비동기 HTTP 호출 펌프 (블로킹 없음)</li>
* <li>Sink.foreach: 응답 배출구 (원래 요청자에게 전달)</li>
* </ul>
*
* <p>사용자는 {@link ApiRequest}를 보내고 {@link ApiResponse}를 받기만 하면 됩니다.
* 호출 제약(백프레셔, TPS, 429 재시도)은 이 액터가 모두 처리합니다.
*
* <p><b>재사용 방법</b>: apiBaseUrl과 maxRequestsPerSecond를 설정하여
* 다양한 Cafe24 API 엔드포인트에 대해 독립적인 안전 호출 장치를 생성할 수 있습니다.
*/
public class SafeApiCallerActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
private final String apiBaseUrl;
private final int maxRequestsPerSecond;
private final HttpClient httpClient;
private ActorRef streamEntry;
public SafeApiCallerActor(String apiBaseUrl, int maxRequestsPerSecond) {
this.apiBaseUrl = apiBaseUrl;
this.maxRequestsPerSecond = maxRequestsPerSecond;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
}
public static Props props(String apiBaseUrl, int maxRequestsPerSecond) {
return Props.create(SafeApiCallerActor.class, apiBaseUrl, maxRequestsPerSecond);
}
@Override
public void preStart() {
var materializer = SystemMaterializer.get(getContext().getSystem()).materializer();
// Akka Streams 백프레셔 파이프라인 구성
// Source.actorRef(유입) → throttle(유량제한) → mapAsync(비동기호출) → Sink(응답전달)
streamEntry = Source.<StreamEnvelope>actorRef(100, OverflowStrategy.dropNew())
.throttle(
maxRequestsPerSecond,
FiniteDuration.create(1, TimeUnit.SECONDS),
maxRequestsPerSecond,
ThrottleMode.shaping()
)
.mapAsync(1, this::callApiWithAdaptiveBackpressure)
.to(Sink.foreach(this::deliverResponse))
.run(materializer);
log.info("SafeApiCallerActor 초기화 완료 - baseUrl={}, maxRps={}", apiBaseUrl, maxRequestsPerSecond);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ApiRequest.class, request -> {
// 요청을 스트림 파이프라인에 투입 (논블로킹)
streamEntry.tell(new StreamEnvelope(request), getSelf());
})
.build();
}
/**
* 적응형 백프레셔를 적용한 API 호출.
* 응답 헤더의 버킷 사용률에 따라 추가 지연을 동적으로 적용합니다.
*/
private CompletionStage<StreamResult> callApiWithAdaptiveBackpressure(StreamEnvelope envelope) {
return executeHttpCall(envelope, 0)
.exceptionally(ex -> {
log.error("API 호출 실패: {}", ex.getMessage());
return new StreamResult(
envelope.request(),
new ApiResponse(envelope.request().word(), "ERROR: " + ex.getMessage(), 500, 0, 0)
);
});
}
private CompletionStage<StreamResult> executeHttpCall(StreamEnvelope envelope, int retryCount) {
String word = envelope.request().word();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(apiBaseUrl + "/api/echo?word=" + word))
.timeout(Duration.ofSeconds(5))
.GET()
.build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenCompose(response -> {
int statusCode = response.statusCode();
if (statusCode == 429 && retryCount < 3) {
// Rate limited - X-Cafe24-Call-Remain 헤더 기반 대기 후 재시도
String remainStr = response.headers()
.firstValue("X-Cafe24-Call-Remain").orElse("1");
int waitSeconds = Integer.parseInt(remainStr);
log.warning("429 응답 수신 - {}초 후 재시도 (retry={})", waitSeconds, retryCount + 1);
return delay(waitSeconds * 1000L)
.thenCompose(v -> executeHttpCall(envelope, retryCount + 1));
}
// 응답 헤더에서 버킷 상태 파싱
String callLimit = response.headers()
.firstValue("X-Api-Call-Limit").orElse("0/10");
String[] parts = callLimit.split("/");
int bucketUsed = Integer.parseInt(parts[0]);
int bucketMax = Integer.parseInt(parts[1]);
double usageRatio = (double) bucketUsed / bucketMax;
StreamResult result = new StreamResult(
envelope.request(),
new ApiResponse(word, response.body(), statusCode, bucketUsed, bucketMax)
);
// 적응형 백프레셔: 버킷 사용률 기반 추가 지연
// 유압장치의 압력 게이지를 읽고 밸브를 조절하는 것과 같은 원리
long adaptiveDelayMs = 0;
if (usageRatio > 0.8) {
adaptiveDelayMs = 500;
log.info("버킷 사용률 {}% - 500ms 추가 지연 적용", (int) (usageRatio * 100));
} else if (usageRatio > 0.5) {
adaptiveDelayMs = 200;
}
if (adaptiveDelayMs > 0) {
return delay(adaptiveDelayMs).thenApply(v -> result);
}
return CompletableFuture.completedFuture(result);
});
}
/** 스트림 출구: 완료된 응답을 원래 요청자에게 전달 */
private void deliverResponse(StreamResult result) {
ActorRef replyTo = result.originalRequest().replyTo();
replyTo.tell(result.response(), getSelf());
}
/** 논블로킹 지연 (Thread.sleep 대신 delayedExecutor 사용) */
private CompletionStage<Void> delay(long millis) {
return CompletableFuture.supplyAsync(
() -> null,
CompletableFuture.delayedExecutor(millis, TimeUnit.MILLISECONDS)
);
}
}
유닛테스트
package com.example.cafe24;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.javadsl.TestKit;
import com.example.cafe24.actor.SafeApiCallerActor;
import com.example.cafe24.api.DummyCafe24Server;
import com.example.cafe24.message.Messages.ApiRequest;
import com.example.cafe24.message.Messages.ApiResponse;
import org.junit.jupiter.api.*;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.junit.jupiter.api.Assertions.*;
/**
* SafeApiCallerActor 백프레셔 장치 검증 테스트.
*
* <p>시나리오:
* <ol>
* <li>hello → world 변환 검증</li>
* <li>일반 단어 에코 검증</li>
* <li>직접 호출(백프레셔 미적용) 시 429 발생 확인</li>
* <li>백프레셔 적용 시 대량 burst 요청이 429 없이 모두 성공</li>
* </ol>
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class SafeApiCallerTest {
private static ActorSystem system;
private static DummyCafe24Server dummyServer;
private static final int SERVER_PORT = 18080;
private static final int BUCKET_CAPACITY = 10;
private static final int LEAK_RATE = 2; // 초당 2회 감소
@BeforeAll
static void setup() throws Exception {
system = ActorSystem.create("test-system");
dummyServer = new DummyCafe24Server(SERVER_PORT, BUCKET_CAPACITY, LEAK_RATE);
dummyServer.start();
// 서버 준비 대기
Thread.sleep(500);
}
@AfterAll
static void teardown() {
dummyServer.stop();
TestKit.shutdownActorSystem(system);
}
@BeforeEach
void resetServer() throws InterruptedException {
dummyServer.reset();
// 버킷 초기화 후 안정화 대기
Thread.sleep(200);
}
@Test
@Order(1)
@DisplayName("hello 요청 시 world 응답 반환")
void testHelloWorld() {
new TestKit(system) {{
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
caller.tell(new ApiRequest("hello", getRef()), getRef());
ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
assertEquals("hello", response.word());
assertEquals("world", response.result());
assertEquals(200, response.statusCode());
expectNoMessage(Duration.ofMillis(200));
}};
}
@Test
@Order(2)
@DisplayName("일반 단어 에코 응답 반환")
void testEchoResponse() {
new TestKit(system) {{
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
caller.tell(new ApiRequest("akka-stream", getRef()), getRef());
ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
assertEquals("akka-stream", response.word());
assertEquals("akka-stream", response.result());
assertEquals(200, response.statusCode());
expectNoMessage(Duration.ofMillis(200));
}};
}
@Test
@Order(3)
@DisplayName("직접 호출: 대량 동시 요청 시 429 발생 확인 (백프레셔 미적용)")
void testDirectCallCauses429() {
HttpClient client = HttpClient.newHttpClient();
int totalRequests = 20;
List<CompletableFuture<HttpResponse<String>>> futures = new ArrayList<>();
// 백프레셔 없이 20건 동시 호출 → 버킷(capacity=10) 초과 → 429 발생
for (int i = 0; i < totalRequests; i++) {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + SERVER_PORT + "/api/echo?word=direct-" + i))
.timeout(Duration.ofSeconds(5))
.GET()
.build();
futures.add(client.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
}
long count429 = futures.stream()
.map(CompletableFuture::join)
.filter(r -> r.statusCode() == 429)
.count();
long count200 = futures.stream()
.map(CompletableFuture::join)
.filter(r -> r.statusCode() == 200)
.count();
System.out.println("[직접호출] 총 " + totalRequests + "건 → 200: " + count200 + "건, 429: " + count429 + "건");
assertTrue(count429 > 0, "백프레셔 없이 동시 " + totalRequests + "건 호출 시 429가 발생해야 합니다");
}
@Test
@Order(4)
@DisplayName("백프레셔 적용: 대량 burst 요청이 429 없이 모두 200 성공")
void testBackpressurePrevents429() {
new TestKit(system) {{
// 백프레셔 적용 - 초당 2건으로 throttle (버킷 leak rate와 동일)
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
int totalRequests = 15;
for (int i = 0; i < totalRequests; i++) {
caller.tell(new ApiRequest("safe-" + i, getRef()), getRef());
}
// 모든 응답 수집 (throttle로 인해 ~8초 소요)
List<ApiResponse> responses = new ArrayList<>();
for (int i = 0; i < totalRequests; i++) {
ApiResponse response = expectMsgClass(Duration.ofSeconds(30), ApiResponse.class);
responses.add(response);
}
// 모든 응답이 200이어야 함 (429 없음)
long successCount = responses.stream()
.filter(r -> r.statusCode() == 200)
.count();
System.out.println("[백프레셔] 총 " + totalRequests + "건 → 200: " + successCount + "건");
for (ApiResponse r : responses) {
System.out.println(" word=" + r.word() + ", result=" + r.result()
+ ", bucket=" + r.bucketUsed() + "/" + r.bucketMax());
}
assertEquals(totalRequests, successCount,
"백프레셔 적용 시 모든 요청이 429 없이 성공해야 합니다");
// 에코 검증
for (ApiResponse r : responses) {
assertTrue(r.result().startsWith("safe-"),
"에코 응답이어야 합니다: " + r.result());
}
}};
}
@Test
@Order(5)
@DisplayName("버킷 상태 헤더가 응답에 포함됨")
void testBucketHeadersInResponse() {
new TestKit(system) {{
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
caller.tell(new ApiRequest("header-test", getRef()), getRef());
ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
assertTrue(response.bucketMax() > 0, "버킷 최대값이 포함되어야 합니다");
assertTrue(response.bucketUsed() > 0, "버킷 사용량이 포함되어야 합니다");
assertEquals(BUCKET_CAPACITY, response.bucketMax(), "버킷 최대값이 서버 설정과 일치해야 합니다");
}};
}
}
백프레셔 기법으로 CAFE24-API 호출 안전 처리하기
최근 업데이트된 카페24 API 호출제약 스펙가이드에 따라 액터모델을 이용해 안전하게 호출처리하는 액터와 테스트 수행까지 사전에 준비된 스킬모드를 이용해 구현시도를 해보았습니다.
샘플은 카페24 API 호출 제약처리기이지만 , 호출제약이 다양한 여러 API에서 활용가능한 기법입니다.
카페24 API SPEC
주로 B2B를 다루는 비즈니스 영역에서 파악해야하는 주제로 우리 개발팀과 상류층(카카오API,카페24API,네이버API)의 관계맵핑이 어떻게 되고
이용을 하고 표현해야하는가? 는 DDD에 에서 경계구분후 컨텍스트 맵핑하는 방법을 참고하면 도움될수 있습니다.
공개호스트 - OpenHostService
이 프로토콜은 공개가 되어있고 이용하기위한 문서화가 잘되어 있으며 이것을 이용하는 팀은 상류 API를 그대로 이용을 해야합니다.
순응적 패턴
언제 무엇을 받게될지 상류(Upstream)인 공급자(Provider)가 주로 결정을 하지고 이것을 제공받는 하류(DownStream)팀은 상류 스트림을 준수합니다. 공개호스트와는 다르게 하류팀의 요구사항을 받아들이기도합니다.
more : 컨텍스트 맵핑과 API
제약 호출 알고리즘 파악하기
바이브를 돌리기전 무조건 수행해죠가 아닌~ 제약 알고리즘및 대응해야하는 패턴조사를 어느정도 준비해두면 도움될수 있습니다.
API 요청 수 제한 정책
"Leaky Bucket" 알고리즘을 사용하고 있습니다.
물을 담는 구멍 난 양동이를 상상해보세요.
🧑💻 API 요청 = 물
🪣 버킷 = 요청 저장 공간
🕳️ 구멍 = 일정 속도로 빠져나가는 처리 속도
흐름
요청이 들어오면 → 버킷에 저장
버킷은 일정한 속도로만 요청을 처리
입구에서 유체가 들어옵니다.
압력이 낮으면 → 밸브가 열려서 정상 흐름 유지
압력이 높아지면 →
스프링을 밀어 올리면서
밸브가 닫히기 시작
결과적으로 입구 쪽 압력이 올라가면서 유입이 줄어듭니다
Leaky Bucket은
→ 넘치면 잘라버리는 방식(차단)
Backpressure는
→ 넘치기 전에 속도를 조절하는 방식(제어)
즉,
❌ 버리는 게 아니라
✅ 흐름을 조절합니다.
즉 카페24 API는 다운스트림이 호출을 많이 못하게 차단을 하고 압력수치에 사용할 힌트값을 알려줍니다.
다운스트림 파트는 상류가 알려주는 압력수치를 이용해 백프레셔의 압력값을 이용해 호출 조절기를 만드는 전략입니다.
프롬프트 - 플랜
카페24 API의 호출제약 정책에 의해.. 안전하게 호출할수 있는 장치를 구현하려고 합니다. # 활용 스킬 다음 스킬을 조회해 활용가능한지 체크후 스킬을 이용해주세요 스킬조회및 인식이 안되는경우 진행하지말고 멈춰주세요 - java-akka-classic - java-akka-classic-test # 프로젝트 지침 - 프로젝트 생성 위치 : skill-test/projects/sample22 - 실제 cafe24 API를 호출하지는 않으며...Leaky bucket 이 적용해 제약응답을하는 API를 더미API로 작성합니다. 더미 API는 테스트 장치로 이용될예정으로 로컬호출가능한 API로 작성해주세요 - API의 본문응답은 요청시 hello 인경우... 응답으로 world를 반환하고... 나머지 단어는 그대로 똑같이 응답으로 돌려주게 작성 - skill-test/docs/카페24API호출제약.md : 성능은 헤더값을 통해 응답이 되며 이값을 참고 해 백프레셔의 압력값으로 이용 - 1노드 테스트이기때문에 Leaky bucket은 적절하게 처리량을 다소 적은수준을 이용합니다. (성능수치 조절가능) - 백프레셔 기법(AkkaStream장치이용)을 이용해 cafe24 api를 안전하게 호출하는 장치생성 재활용가능한 모델로 작성 - API를 안전하게 호출해주는 액터모델을 작성.. 이것을 이용하는 대상은 완료된 응답값을 받는것에만 집중을 하며... 호출제약(백프레셔,TPS등)은 관심이 없습니다. - AkksStream과 액터모델을 상호 설계해 호출블락킹이아닌 스트림모드(유압장치)를 활용 블락킹을 최소화하는 방법으로 구현합니다. # 테스트 지침 - 유닛테스트를 필수로 작성, 이 샘플프로젝트가 목적하고자하는 시나리오로 작성하고 검증해주세요 # 테스트 완료후 문서지침 - skill-test/result/TEST-Cafe24APILimit-RESULT.md : 테스트 수행후 결과 정리 - skill-test/result/TEST-Cafe24APILimit-GUIDE.md : 이 수행결과로 생성된 장치이용에 대한 가이드액터 스킬모드
액터스킬은 자바/코틀린/닷넷 3가지 진영에 모드 사용할수 있으며 스킬스토어를 통해 간편 설치가 가능합니다.
테스트 수행결과
# TEST-Cafe24APILimit 테스트 결과 ## 테스트 개요 카페24 API의 Leaky Bucket 호출 제약 정책에 대응하여, Akka Streams 백프레셔를 활용한 안전 호출 장치를 구현하고 검증한 결과입니다. - **프로젝트 위치**: `skill-test/projects/sample22` - **사용 스킬**: `java-akka-classic`, `java-akka-classic-test` - **플랫폼**: Java 23 + Akka Classic 2.7.0 + Akka Streams - **테스트 일자**: 2026-02-28 ## 더미 서버 설정 (1노드 테스트) | 항목 | 값 | 설명 | |------|-----|------| | Bucket Capacity | 10 | 최대 동시 요청 수 | | Leak Rate | 2/sec | 초당 버킷 감소량 | | 안전 호출 TPS | 2/sec | 스트림 throttle 설정값 | ## 테스트 결과 ### 전체: 5건 PASSED / 0건 FAILED | # | 테스트명 | 결과 | 비고 | |---|---------|------|------| | 1 | hello → world 변환 | PASSED | 요청 "hello" → 응답 "world" 정상 | | 2 | 일반 단어 에코 | PASSED | 요청 "akka-stream" → 응답 "akka-stream" 정상 | | 3 | 직접호출 429 발생 확인 | PASSED | 20건 동시 → 200: 10건, 429: **10건** | | 4 | 백프레셔 burst 안전 처리 | PASSED | 15건 burst → 200: **15건**, 429: 0건 | | 5 | 버킷 상태 헤더 검증 | PASSED | bucketUsed/bucketMax 정상 포함 | ### 핵심 비교: 직접 호출 vs 백프레셔 적용 ``` [직접호출 - 백프레셔 미적용] 총 20건 동시 요청 → 200: 10건, 429: 10건 (50% 실패) 버킷 capacity(10) 초과로 절반이 거부됨 [백프레셔 적용 - SafeApiCallerActor] 총 15건 burst 요청 → 200: 15건, 429: 0건 (100% 성공) 버킷 사용량: 1~3 수준으로 안정적 유지 ``` ### 백프레셔 적용 시 버킷 사용량 추이 ``` safe-0 → bucket=1/10 safe-1 → bucket=2/10 safe-2 → bucket=3/10 safe-3 → bucket=2/10 ← leak으로 감소 safe-4 → bucket=3/10 safe-5 → bucket=2/10 ← leak으로 감소 safe-6 → bucket=3/10 ...반복 패턴 (2~3 수준 유지) ``` throttle(2 req/sec)과 서버 leak rate(2/sec)가 균형을 이루어 버킷이 안전 범위(1~3) 내에서 안정적으로 유지됩니다. ## 아키텍처 구성 ``` [사용자] → ApiRequest → [SafeApiCallerActor] │ ┌─────────▼──────────┐ │ Akka Streams 파이프라인 │ │ │ │ Source.actorRef │ ← 유입 밸브 (buffer=100) │ │ │ │ throttle(2/s) │ ← 유량 조절기 │ │ │ │ mapAsync(1) │ ← 비동기 HTTP 펌프 │ + 적응형 지연 │ (429 자동 재시도) │ │ │ │ Sink.foreach │ ← 응답 배출구 └───────│────────────┘ ▼ [사용자] ← ApiResponse ← replyTo.tell() ``` ## 생성된 파일 | 파일 | 역할 | |------|------| | `message/Messages.java` | ApiRequest, ApiResponse, StreamEnvelope, StreamResult 메시지 정의 | | `api/DummyCafe24Server.java` | Leaky Bucket 더미 API 서버 (JDK HttpServer 기반) | | `actor/SafeApiCallerActor.java` | 백프레셔 적용 안전 호출 액터 (재사용 가능) | | `SafeApiCallerTest.java` | 5건 유닛테스트 (TestKit 기반) |장치 이용가이드
# Cafe24 API 안전 호출 장치 사용 가이드 ## 개요 카페24 API의 Leaky Bucket 호출 제약 정책에 대응하는 **Akka Streams 기반 백프레셔 안전 호출 장치**입니다. 사용자는 API 호출 제약(TPS, 429 에러, 버킷 상태)을 전혀 신경 쓰지 않고, 요청을 보내고 결과만 받으면 됩니다. ## 핵심 컴포넌트 ### SafeApiCallerActor 호출 제약을 내부적으로 처리하는 재사용 가능한 액터입니다. ```java // 생성: API 기본 URL + 초당 최대 호출 수 설정 ActorRef caller = system.actorOf( SafeApiCallerActor.props("https://api.cafe24.com", 2), "cafe24-api-caller" ); // 요청: 단어와 응답 받을 ActorRef 전달 caller.tell(new ApiRequest("hello", myActorRef), ActorRef.noSender()); // 응답: ApiResponse 메시지로 수신 // - word: 원본 요청 단어 // - result: API 응답 본문 // - statusCode: HTTP 상태 코드 // - bucketUsed/bucketMax: 버킷 상태 ``` ### 내부 동작 (유압장치 비유) ``` 요청 투입 ──→ [버퍼 탱크] ──→ [유량 조절 밸브] ──→ [비동기 펌프] ──→ 응답 배출 (tell) (100건 버퍼) (throttle 2/s) (mapAsync HTTP) (replyTo.tell) │ [압력 게이지] (X-Api-Call-Limit 헤더 읽기) │ 사용률 > 80%: +500ms 지연 사용률 > 50%: +200ms 지연 429 응답: 대기 후 재시도 ``` ## 사용 방법 ### 1. 기본 사용법 (단일 API) ```java // ActorSystem 초기화 ActorSystem system = ActorSystem.create("my-system"); // 안전 호출 장치 생성 (초당 2건 제한) ActorRef apiCaller = system.actorOf( SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2", 2), "product-api" ); // 요청 보내기 (호출 제약은 장치가 알아서 처리) apiCaller.tell(new ApiRequest("products/count", self()), self()); // 응답 처리 receive(ApiResponse.class, response -> { System.out.println("결과: " + response.result()); System.out.println("버킷 상태: " + response.bucketUsed() + "/" + response.bucketMax()); }); ``` ### 2. 다중 API 엔드포인트 (장치 재사용) ```java // 서로 다른 API 엔드포인트에 독립적인 안전 호출 장치 생성 ActorRef productApi = system.actorOf( SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2/products", 2), "product-api" ); ActorRef orderApi = system.actorOf( SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2/orders", 2), "order-api" ); // 각 장치는 독립적으로 백프레셔 적용 productApi.tell(new ApiRequest("count", self()), self()); orderApi.tell(new ApiRequest("list", self()), self()); ``` ### 3. 대량 처리 (burst 안전) ```java // 100건의 요청을 한번에 투입해도 안전 for (int i = 0; i < 100; i++) { apiCaller.tell(new ApiRequest("item-" + i, self()), self()); } // → throttle이 초당 2건씩 처리 // → 버킷 오버플로 없이 전부 성공 // → 약 50초 후 모든 응답 수신 ``` ## 성능 튜닝 ### throttle 설정 가이드 | Cafe24 버킷 설정 | 권장 throttle | 안전 마진 | |-----------------|--------------|----------| | capacity=40, leak=2/s | `maxRequestsPerSecond=2` | 최대 안전 (버킷 거의 미사용) | | capacity=40, leak=2/s | `maxRequestsPerSecond=3` | 적절 (적응형 지연으로 보완) | | capacity=40, leak=2/s | `maxRequestsPerSecond=5` | 공격적 (적응형 지연 의존) | **권장**: `maxRequestsPerSecond <= leak rate`로 설정하면 버킷이 거의 차지 않아 가장 안전합니다. ### 버퍼 크기 조절 ```java // Source.actorRef의 버퍼 크기 (현재 100) // 대량 burst가 예상되면 증가 가능 Source.<StreamEnvelope>actorRef(500, OverflowStrategy.dropNew()) ``` | 버퍼 크기 | 용도 | |----------|------| | 100 | 일반 사용 (기본값) | | 500 | 대량 burst 허용 | | 1000 | 극대량 처리 | ### OverflowStrategy 선택 | 전략 | 설명 | 권장 상황 | |------|------|----------| | `dropNew()` | 버퍼 초과 시 새 요청 폐기 | 최신 요청 우선 (현재 설정) | | `dropHead()` | 가장 오래된 요청 폐기 | 최신 요청 우선 | | `backpressure()` | upstream에 압력 전파 | Source.queue 사용 시 | | `fail()` | 스트림 실패 | 데이터 손실 불허 시 | ## 적응형 백프레셔 상세 SafeApiCallerActor는 API 응답 헤더를 읽어 동적으로 호출 속도를 조절합니다: ``` X-Api-Call-Limit: 8/10 → 사용률 80% → +500ms 지연 추가 X-Api-Call-Limit: 5/10 → 사용률 50% → +200ms 지연 추가 X-Api-Call-Limit: 3/10 → 사용률 30% → 추가 지연 없음 HTTP 429 응답 → X-Cafe24-Call-Remain 초만큼 대기 후 재시도 (최대 3회) ``` 이 메커니즘은 throttle의 기본 유량 제한 위에 추가로 작동하여 이중 안전장치 역할을 합니다. ## 실제 Cafe24 API 적용 시 변경 사항 1. **DummyCafe24Server 제거**: 실제 API URL로 교체 2. **인증 헤더 추가**: `HttpRequest.newBuilder().header("Authorization", "Bearer " + token)` 3. **요청/응답 포맷 변경**: Messages.java의 ApiRequest/ApiResponse를 실제 API 스키마에 맞게 확장 4. **throttle 값 조정**: 실제 Cafe24 정책(capacity=40, leak=2/s)에 맞게 설정 ```java // 실제 적용 예시 ActorRef caller = system.actorOf( SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2", 2), "cafe24-safe-caller" ); ``` ## 의존성 ```groovy dependencies { implementation "com.typesafe.akka:akka-actor_2.13:2.7.0" implementation "com.typesafe.akka:akka-stream_2.13:2.7.0" } ```생성된 핵심 코드
카페24 더미 API
package com.example.cafe24.api; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * Cafe24 API를 시뮬레이션하는 더미 서버. * * <p>Leaky Bucket 알고리즘으로 요청 수를 제한합니다. * <ul> * <li>버킷 용량(capacity)을 초과하면 429 Too Many Requests 반환</li> * <li>버킷은 매초 leakRate만큼 자동 감소</li> * <li>응답 헤더: X-Api-Call-Limit, X-Cafe24-Call-Usage, X-Cafe24-Call-Remain</li> * </ul> * * <p>응답 규칙: "hello" 요청 → "world" 반환, 나머지 단어는 그대로 에코. */ public class DummyCafe24Server { private final HttpServer server; private final int bucketCapacity; private final int leakRatePerSecond; private final AtomicInteger currentBucketLevel = new AtomicInteger(0); private final ScheduledExecutorService leakScheduler; public DummyCafe24Server(int port, int bucketCapacity, int leakRatePerSecond) throws IOException { this.bucketCapacity = bucketCapacity; this.leakRatePerSecond = leakRatePerSecond; // Leaky Bucket 드레인: 매초 leakRate만큼 버킷 수위 감소 this.leakScheduler = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "leaky-bucket-drain"); t.setDaemon(true); return t; }); this.leakScheduler.scheduleAtFixedRate( () -> currentBucketLevel.updateAndGet(c -> Math.max(0, c - leakRatePerSecond)), 1, 1, TimeUnit.SECONDS ); this.server = HttpServer.create(new InetSocketAddress(port), 0); this.server.createContext("/api/echo", this::handleEcho); this.server.setExecutor(Executors.newFixedThreadPool(4)); } public void start() { server.start(); } public void stop() { server.stop(0); leakScheduler.shutdown(); } /** 테스트 간 버킷 상태 초기화 */ public void reset() { currentBucketLevel.set(0); } public int getCurrentBucketLevel() { return currentBucketLevel.get(); } public int getBucketCapacity() { return bucketCapacity; } private void handleEcho(HttpExchange exchange) throws IOException { if (!"GET".equals(exchange.getRequestMethod())) { exchange.sendResponseHeaders(405, -1); return; } int level = currentBucketLevel.incrementAndGet(); if (level > bucketCapacity) { // 버킷 초과 - 429 반환 (실제 소비하지 않으므로 감소) currentBucketLevel.decrementAndGet(); double usagePercent = ((double) level / bucketCapacity) * 100; int remainSeconds = (int) Math.ceil((double) (level - bucketCapacity) / leakRatePerSecond) + 1; exchange.getResponseHeaders().add("X-Api-Call-Limit", level + "/" + bucketCapacity); exchange.getResponseHeaders().add("X-Cafe24-Call-Usage", String.format("%.1f", usagePercent)); exchange.getResponseHeaders().add("X-Cafe24-Call-Remain", String.valueOf(remainSeconds)); byte[] body = "Too Many Requests".getBytes(StandardCharsets.UTF_8); exchange.sendResponseHeaders(429, body.length); try (OutputStream os = exchange.getResponseBody()) { os.write(body); } return; } // 파라미터 파싱 String word = parseWordParam(exchange.getRequestURI().getQuery()); // "hello" → "world", 나머지는 그대로 에코 String result = "hello".equals(word) ? "world" : word; // 응답 헤더 설정 double usagePercent = ((double) level / bucketCapacity) * 100; exchange.getResponseHeaders().add("X-Api-Call-Limit", level + "/" + bucketCapacity); exchange.getResponseHeaders().add("X-Cafe24-Call-Usage", String.format("%.1f", usagePercent)); exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8"); byte[] responseBytes = result.getBytes(StandardCharsets.UTF_8); exchange.sendResponseHeaders(200, responseBytes.length); try (OutputStream os = exchange.getResponseBody()) { os.write(responseBytes); } } private String parseWordParam(String query) { if (query == null) return ""; for (String param : query.split("&")) { String[] pair = param.split("=", 2); if (pair.length == 2 && "word".equals(pair[0])) { return URLDecoder.decode(pair[1], StandardCharsets.UTF_8); } } return ""; } }백프레셔가 포함된 안전호출 액터모델
package com.example.cafe24.actor; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.stream.OverflowStrategy; import akka.stream.SystemMaterializer; import akka.stream.ThrottleMode; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import com.example.cafe24.message.Messages.ApiRequest; import com.example.cafe24.message.Messages.ApiResponse; import com.example.cafe24.message.Messages.StreamEnvelope; import com.example.cafe24.message.Messages.StreamResult; import scala.concurrent.duration.FiniteDuration; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; /** * Cafe24 API를 백프레셔를 적용하여 안전하게 호출하는 액터. * * <p>내부에 Akka Streams 파이프라인(유압장치)을 구성하여: * <ul> * <li>Source.actorRef: 요청 유입구 (밸브)</li> * <li>throttle: 유량 조절기 (초당 maxRequestsPerSecond 제한)</li> * <li>mapAsync: 비동기 HTTP 호출 펌프 (블로킹 없음)</li> * <li>Sink.foreach: 응답 배출구 (원래 요청자에게 전달)</li> * </ul> * * <p>사용자는 {@link ApiRequest}를 보내고 {@link ApiResponse}를 받기만 하면 됩니다. * 호출 제약(백프레셔, TPS, 429 재시도)은 이 액터가 모두 처리합니다. * * <p><b>재사용 방법</b>: apiBaseUrl과 maxRequestsPerSecond를 설정하여 * 다양한 Cafe24 API 엔드포인트에 대해 독립적인 안전 호출 장치를 생성할 수 있습니다. */ public class SafeApiCallerActor extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); private final String apiBaseUrl; private final int maxRequestsPerSecond; private final HttpClient httpClient; private ActorRef streamEntry; public SafeApiCallerActor(String apiBaseUrl, int maxRequestsPerSecond) { this.apiBaseUrl = apiBaseUrl; this.maxRequestsPerSecond = maxRequestsPerSecond; this.httpClient = HttpClient.newBuilder() .connectTimeout(Duration.ofSeconds(5)) .build(); } public static Props props(String apiBaseUrl, int maxRequestsPerSecond) { return Props.create(SafeApiCallerActor.class, apiBaseUrl, maxRequestsPerSecond); } @Override public void preStart() { var materializer = SystemMaterializer.get(getContext().getSystem()).materializer(); // Akka Streams 백프레셔 파이프라인 구성 // Source.actorRef(유입) → throttle(유량제한) → mapAsync(비동기호출) → Sink(응답전달) streamEntry = Source.<StreamEnvelope>actorRef(100, OverflowStrategy.dropNew()) .throttle( maxRequestsPerSecond, FiniteDuration.create(1, TimeUnit.SECONDS), maxRequestsPerSecond, ThrottleMode.shaping() ) .mapAsync(1, this::callApiWithAdaptiveBackpressure) .to(Sink.foreach(this::deliverResponse)) .run(materializer); log.info("SafeApiCallerActor 초기화 완료 - baseUrl={}, maxRps={}", apiBaseUrl, maxRequestsPerSecond); } @Override public Receive createReceive() { return receiveBuilder() .match(ApiRequest.class, request -> { // 요청을 스트림 파이프라인에 투입 (논블로킹) streamEntry.tell(new StreamEnvelope(request), getSelf()); }) .build(); } /** * 적응형 백프레셔를 적용한 API 호출. * 응답 헤더의 버킷 사용률에 따라 추가 지연을 동적으로 적용합니다. */ private CompletionStage<StreamResult> callApiWithAdaptiveBackpressure(StreamEnvelope envelope) { return executeHttpCall(envelope, 0) .exceptionally(ex -> { log.error("API 호출 실패: {}", ex.getMessage()); return new StreamResult( envelope.request(), new ApiResponse(envelope.request().word(), "ERROR: " + ex.getMessage(), 500, 0, 0) ); }); } private CompletionStage<StreamResult> executeHttpCall(StreamEnvelope envelope, int retryCount) { String word = envelope.request().word(); HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(apiBaseUrl + "/api/echo?word=" + word)) .timeout(Duration.ofSeconds(5)) .GET() .build(); return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .thenCompose(response -> { int statusCode = response.statusCode(); if (statusCode == 429 && retryCount < 3) { // Rate limited - X-Cafe24-Call-Remain 헤더 기반 대기 후 재시도 String remainStr = response.headers() .firstValue("X-Cafe24-Call-Remain").orElse("1"); int waitSeconds = Integer.parseInt(remainStr); log.warning("429 응답 수신 - {}초 후 재시도 (retry={})", waitSeconds, retryCount + 1); return delay(waitSeconds * 1000L) .thenCompose(v -> executeHttpCall(envelope, retryCount + 1)); } // 응답 헤더에서 버킷 상태 파싱 String callLimit = response.headers() .firstValue("X-Api-Call-Limit").orElse("0/10"); String[] parts = callLimit.split("/"); int bucketUsed = Integer.parseInt(parts[0]); int bucketMax = Integer.parseInt(parts[1]); double usageRatio = (double) bucketUsed / bucketMax; StreamResult result = new StreamResult( envelope.request(), new ApiResponse(word, response.body(), statusCode, bucketUsed, bucketMax) ); // 적응형 백프레셔: 버킷 사용률 기반 추가 지연 // 유압장치의 압력 게이지를 읽고 밸브를 조절하는 것과 같은 원리 long adaptiveDelayMs = 0; if (usageRatio > 0.8) { adaptiveDelayMs = 500; log.info("버킷 사용률 {}% - 500ms 추가 지연 적용", (int) (usageRatio * 100)); } else if (usageRatio > 0.5) { adaptiveDelayMs = 200; } if (adaptiveDelayMs > 0) { return delay(adaptiveDelayMs).thenApply(v -> result); } return CompletableFuture.completedFuture(result); }); } /** 스트림 출구: 완료된 응답을 원래 요청자에게 전달 */ private void deliverResponse(StreamResult result) { ActorRef replyTo = result.originalRequest().replyTo(); replyTo.tell(result.response(), getSelf()); } /** 논블로킹 지연 (Thread.sleep 대신 delayedExecutor 사용) */ private CompletionStage<Void> delay(long millis) { return CompletableFuture.supplyAsync( () -> null, CompletableFuture.delayedExecutor(millis, TimeUnit.MILLISECONDS) ); } }유닛테스트
package com.example.cafe24; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.javadsl.TestKit; import com.example.cafe24.actor.SafeApiCallerActor; import com.example.cafe24.api.DummyCafe24Server; import com.example.cafe24.message.Messages.ApiRequest; import com.example.cafe24.message.Messages.ApiResponse; import org.junit.jupiter.api.*; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.*; /** * SafeApiCallerActor 백프레셔 장치 검증 테스트. * * <p>시나리오: * <ol> * <li>hello → world 변환 검증</li> * <li>일반 단어 에코 검증</li> * <li>직접 호출(백프레셔 미적용) 시 429 발생 확인</li> * <li>백프레셔 적용 시 대량 burst 요청이 429 없이 모두 성공</li> * </ol> */ @TestMethodOrder(MethodOrderer.OrderAnnotation.class) class SafeApiCallerTest { private static ActorSystem system; private static DummyCafe24Server dummyServer; private static final int SERVER_PORT = 18080; private static final int BUCKET_CAPACITY = 10; private static final int LEAK_RATE = 2; // 초당 2회 감소 @BeforeAll static void setup() throws Exception { system = ActorSystem.create("test-system"); dummyServer = new DummyCafe24Server(SERVER_PORT, BUCKET_CAPACITY, LEAK_RATE); dummyServer.start(); // 서버 준비 대기 Thread.sleep(500); } @AfterAll static void teardown() { dummyServer.stop(); TestKit.shutdownActorSystem(system); } @BeforeEach void resetServer() throws InterruptedException { dummyServer.reset(); // 버킷 초기화 후 안정화 대기 Thread.sleep(200); } @Test @Order(1) @DisplayName("hello 요청 시 world 응답 반환") void testHelloWorld() { new TestKit(system) {{ ActorRef caller = system.actorOf( SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2)); caller.tell(new ApiRequest("hello", getRef()), getRef()); ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class); assertEquals("hello", response.word()); assertEquals("world", response.result()); assertEquals(200, response.statusCode()); expectNoMessage(Duration.ofMillis(200)); }}; } @Test @Order(2) @DisplayName("일반 단어 에코 응답 반환") void testEchoResponse() { new TestKit(system) {{ ActorRef caller = system.actorOf( SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2)); caller.tell(new ApiRequest("akka-stream", getRef()), getRef()); ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class); assertEquals("akka-stream", response.word()); assertEquals("akka-stream", response.result()); assertEquals(200, response.statusCode()); expectNoMessage(Duration.ofMillis(200)); }}; } @Test @Order(3) @DisplayName("직접 호출: 대량 동시 요청 시 429 발생 확인 (백프레셔 미적용)") void testDirectCallCauses429() { HttpClient client = HttpClient.newHttpClient(); int totalRequests = 20; List<CompletableFuture<HttpResponse<String>>> futures = new ArrayList<>(); // 백프레셔 없이 20건 동시 호출 → 버킷(capacity=10) 초과 → 429 발생 for (int i = 0; i < totalRequests; i++) { HttpRequest request = HttpRequest.newBuilder() .uri(URI.create("http://localhost:" + SERVER_PORT + "/api/echo?word=direct-" + i)) .timeout(Duration.ofSeconds(5)) .GET() .build(); futures.add(client.sendAsync(request, HttpResponse.BodyHandlers.ofString())); } long count429 = futures.stream() .map(CompletableFuture::join) .filter(r -> r.statusCode() == 429) .count(); long count200 = futures.stream() .map(CompletableFuture::join) .filter(r -> r.statusCode() == 200) .count(); System.out.println("[직접호출] 총 " + totalRequests + "건 → 200: " + count200 + "건, 429: " + count429 + "건"); assertTrue(count429 > 0, "백프레셔 없이 동시 " + totalRequests + "건 호출 시 429가 발생해야 합니다"); } @Test @Order(4) @DisplayName("백프레셔 적용: 대량 burst 요청이 429 없이 모두 200 성공") void testBackpressurePrevents429() { new TestKit(system) {{ // 백프레셔 적용 - 초당 2건으로 throttle (버킷 leak rate와 동일) ActorRef caller = system.actorOf( SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2)); int totalRequests = 15; for (int i = 0; i < totalRequests; i++) { caller.tell(new ApiRequest("safe-" + i, getRef()), getRef()); } // 모든 응답 수집 (throttle로 인해 ~8초 소요) List<ApiResponse> responses = new ArrayList<>(); for (int i = 0; i < totalRequests; i++) { ApiResponse response = expectMsgClass(Duration.ofSeconds(30), ApiResponse.class); responses.add(response); } // 모든 응답이 200이어야 함 (429 없음) long successCount = responses.stream() .filter(r -> r.statusCode() == 200) .count(); System.out.println("[백프레셔] 총 " + totalRequests + "건 → 200: " + successCount + "건"); for (ApiResponse r : responses) { System.out.println(" word=" + r.word() + ", result=" + r.result() + ", bucket=" + r.bucketUsed() + "/" + r.bucketMax()); } assertEquals(totalRequests, successCount, "백프레셔 적용 시 모든 요청이 429 없이 성공해야 합니다"); // 에코 검증 for (ApiResponse r : responses) { assertTrue(r.result().startsWith("safe-"), "에코 응답이어야 합니다: " + r.result()); } }}; } @Test @Order(5) @DisplayName("버킷 상태 헤더가 응답에 포함됨") void testBucketHeadersInResponse() { new TestKit(system) {{ ActorRef caller = system.actorOf( SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2)); caller.tell(new ApiRequest("header-test", getRef()), getRef()); ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class); assertTrue(response.bucketMax() > 0, "버킷 최대값이 포함되어야 합니다"); assertTrue(response.bucketUsed() > 0, "버킷 사용량이 포함되어야 합니다"); assertEquals(BUCKET_CAPACITY, response.bucketMax(), "버킷 최대값이 서버 설정과 일치해야 합니다"); }}; } }