Page History
카페24 API는 Leaky Bucket 알고리즘 기반의 호출 제약 정책을 적용하고 있습니다. 이에 따라 일정 수준 이상의 요청이 집중되면 429(Too Many Requests) 응답으로 차단되며, 이를 고려하지 않은 호출은 서비스 안정성을 해칠 수 있습니다. 본 글에서는 Akka Streams의 백프레셔(Backpressure) 기법과 액터 모델을 활용하여, API 호출 제약을 자동으로 감지하고 흐름을 조절하는 안전 호출 장치를 설계하고 구현한 과정을 공유합니다.
본 예제는 카페24 API를 대상으로 하지만, 카카오 API, 네이버 API 등 호출 제약이 존재하는 다양한 상류(Upstream) API에 범용적으로 적용 가능한 패턴입니다.
카페24 API 호출 제약 스펙
...
B2B 서비스 개발에서는 우리 팀(하류)과 외부 플랫폼 API(상류)의 관계를 명확히 정의하는 것이 중요합니다. DDD(Domain-Driven Design)의 컨텍스트 맵핑은 이러한 경계를 식별하고 협력 방식을 구조화하는 데 유용한 프레임워크를 제공합니다.
공개호스트 패턴 (Open Host Service)
공개호스트는 API가 표준화된 프로토콜로 문서화되어 공개된 패턴입니다. 하류 팀은 상류가 제공하는 API를 있는 그대로 준수해야 하며, 인터페이스 변경에 대한 협상력이 제한적입니다.
| draw.io Diagram | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
예를 들어, Facebook 로그인 API나 정부 공공데이터 OpenAPI를 이용하는 경우가 이에 해당합니다.
하류 개발팀은 상류 API의 스펙을 그대로 준수하며, 상류 팀은 전체 사용자의 의견을 수렴하여 개선할 수는 있지만 특정 팀의 요구에 맞추어 변경하지는 않습니다.
순응적 패턴 (Conformist)
순응적 패턴에서는 공급자(상류)가 인터페이스를 주도적으로 결정하고, 소비자(하류)가 이를 따르는 구조입니다. 공개호스트와 달리 하류 팀의 요구사항이 제한적으로 반영될 수 있는 여지가 있습니다.
draw.io Diagram border true diagramName upstream-ddd simpleViewer false width links auto tbstyle top lbox true diagramWidth 266 revision 1
Cafe24 API는 대부분 공개호스트 패턴에 해당하지만, 파트너 협약을 통해 제공되는 API도 존재합니다. 상류 기능에 오류가 있거나 개선이 필요한 경우 하류에서 요청할 수 있으나, 대부분은 상류 정책에 순응하는 구조입니다.
참고: 컨텍스트 맵핑과 API 설계 패턴에 대한 더 자세한 내용은 별도 문서를 참고하세요.
사전 조사: 호출 제약 알고리즘 파악
구현에 앞서 대상 API의 호출 제약 알고리즘과 대응 패턴을 사전에 조사하는 것이 중요합니다. 바이브 코딩으로 바로 돌입하기보다, 제약 조건을 먼저 파악해두면 설계 품질이 크게 향상됩니다.
Leaky Bucket 알고리즘
- 카페24는 API 요청을 Bucket에 쌓아두었다가, Bucket이 가득 차면 요청을 제한하는 방식인
"Leaky Bucket" 알고리즘을 사용하고 있습니다.
물을 담는 구멍 난 양동이를 상상해보세요
Leaky Bucket을 직관적으로 이해하려면 바닥에 구멍이 뚫린 양동이를 떠올려 보세요.
🧑💻 API 요청 = 물
🪣 버킷 = 요청 저장 공간
🕳️ 구멍 = 일정 속도로 빠져나가는 처리 속도
...
Leaky Bucket vs Backpressure: 핵심 차이
요청이 들어오면 → 버킷에 저장
버킷은 일정한 속도로만 요청을 처리
...
Backpressure는
→ 넘치기 전에 속도를 조절하는 방식(제어)
즉,
❌ 버리는 게 아니라
✅ 흐름을 조절합니다.
공개호스트 - OpenHostService
...
.
정리하면, 카페24 API(상류)는 Leaky Bucket으로 과도한 호출을 차단하면서 응답 헤더를 통해 현재 버킷 상태(압력 수치)를 힌트로 제공합니다.
하류(우리 시스템)는 이 힌트 값을 백프레셔의 압력 게이지로 활용하여, 호출 속도를 동적으로 조절하는 안전 호출 장치를 구성하는 것이 이번 구현의 핵심 전략입니다.
프롬프트 설계: 바이브 코딩의 시작점
| Code Block | ||
|---|---|---|
| ||
카페24 API의 호출제약 정책에 의해.. 안전하게 호출할수 있는 장치를 구현하려고 합니다.
# 활용 스킬
다음 스킬을 조회해 활용가능한지 체크후 스킬을 이용해주세요
스킬조회및 인식이 안되는경우 진행하지말고 멈춰주세요
- java-akka-classic
- java-akka-classic-test
# 프로젝트 지침
- 프로젝트 생성 위치 : skill-test/projects/sample22
- 실제 cafe24 API를 호출하지는 않으며...Leaky bucket 이 적용해 제약응답을하는 API를 더미API로 작성합니다. 더미 API는 테스트 장치로 이용될예정으로 로컬호출가능한 API로 작성해주세요
- API의 본문응답은 요청시 hello 인경우... 응답으로 world를 반환하고... 나머지 단어는 그대로 똑같이 응답으로 돌려주게 작성
- skill-test/docs/카페24API호출제약.md : 성능은 헤더값을 통해 응답이 되며 이값을 참고 해 백프레셔의 압력값으로 이용
- 1노드 테스트이기때문에 Leaky bucket은 적절하게 처리량을 다소 적은수준을 이용합니다. (성능수치 조절가능)
- 백프레셔 기법(AkkaStream장치이용)을 이용해 cafe24 api를 안전하게 호출하는 장치생성 재활용가능한 모델로 작성
- API를 안전하게 호출해주는 액터모델을 작성.. 이것을 이용하는 대상은 완료된 응답값을 받는것에만 집중을 하며... 호출제약(백프레셔,TPS등)은 관심이 없습니다.
- AkksStream과 액터모델을 상호 설계해 호출블락킹이아닌 스트림모드(유압장치)를 활용 블락킹을 최소화하는 방법으로 구현합니다.
# 테스트 지침
- 유닛테스트를 필수로 작성, 이 샘플프로젝트가 목적하고자하는 시나리오로 작성하고 검증해주세요
# 테스트 완료후 문서지침
- skill-test/result/TEST-Cafe24APILimit-RESULT.md : 테스트 수행후 결과 정리
- skill-test/result/TEST-Cafe24APILimit-GUIDE.md : 이 수행결과로 생성된 장치이용에 대한 가이드 |
- 스킬에는 스트림처리및 TPS제어등 동시성 제어를 위한 다양한 기능이 이미 포함되어 있어서 최초 수행 플랜프롬프트에서는 핵심 도메인규칙을 위주로만 설명가능합니다.
액터 스킬 모드: 재사용 가능한 코드 생성 도구
액터 스킬은 Java(Akka Classic), Kotlin(Pekko Typed), C#(Akka.NET) 세 가지 플랫폼을 지원하며, 스킬 스토어를 통해 간편하게 설치할 수 있습니다. 스킬에는 스트림 처리, TPS 제어, 동시성 관리를 위한 다양한 기능이 사전 포함되어 있어, 프롬프트에서는 핵심 도메인 규칙에만 집중하면 됩니다.
| draw.io Diagram | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
페이스북의 공개된 API를 이용하여 로그인 기능을 만들거나 정부 OpenAPI를 이용하여 기능을 개발하는경우
개발팀은 제공되는 API를 그대로 준수하고 따릅니다. 상류 제공 API팀은 전체 의견을 수렴해 기능개선을 할수도 있지만
특정팀의 의견을 위해 기능변경을 하지 않습니다.
순응적 패턴
언제 무엇을 받게될지 상류(Upstream)인 공급자(Provider)가 주로 결정을 하지고
이것을 제공받는 하류(DownStream)팀은 상류 스트림을 준수합니다.
공개호스트와는 다르게 하류팀의 요구사항을 받아들이기도합니다.
...
Cafe24 API의 경우 대부분 공개호스트이지만, 협약되어 제공되는 API가 있을수 있습니다.
협약관계 또는 상류의 기능이 오류가있거나 개선이 필요할시 공급받는 다운스트림에서 개선요청을 할수도 있습니다.
|
- 클로드 크롬이 컨플이 지원하는 DrawIO기능을 탐색해 아 아티컬을 참조 만든 아키텍트 - 이제 설치형을 구매할수 없겠지만 과거 설치해 이용하긴했으나 xml로 정의하고 편집하는 기능이 있다란 사실을 조작하는 구경하면서 발견
이 활동에 의해 생성된 코드및 수행결과를 살펴보자
테스트 수행 결과: 백프레셔 효과 검증
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
# TEST-Cafe24APILimit 테스트 결과
## 테스트 개요
카페24 API의 Leaky Bucket 호출 제약 정책에 대응하여, Akka Streams 백프레셔를 활용한 안전 호출 장치를 구현하고 검증한 결과입니다.
- **프로젝트 위치**: `skill-test/projects/sample22`
- **사용 스킬**: `java-akka-classic`, `java-akka-classic-test`
- **플랫폼**: Java 23 + Akka Classic 2.7.0 + Akka Streams
- **테스트 일자**: 2026-02-28
## 더미 서버 설정 (1노드 테스트)
| 항목 | 값 | 설명 |
|------|-----|------|
| Bucket Capacity | 10 | 최대 동시 요청 수 |
| Leak Rate | 2/sec | 초당 버킷 감소량 |
| 안전 호출 TPS | 2/sec | 스트림 throttle 설정값 |
## 테스트 결과
### 전체: 5건 PASSED / 0건 FAILED
| # | 테스트명 | 결과 | 비고 |
|---|---------|------|------|
| 1 | hello → world 변환 | PASSED | 요청 "hello" → 응답 "world" 정상 |
| 2 | 일반 단어 에코 | PASSED | 요청 "akka-stream" → 응답 "akka-stream" 정상 |
| 3 | 직접호출 429 발생 확인 | PASSED | 20건 동시 → 200: 10건, 429: **10건** |
| 4 | 백프레셔 burst 안전 처리 | PASSED | 15건 burst → 200: **15건**, 429: 0건 |
| 5 | 버킷 상태 헤더 검증 | PASSED | bucketUsed/bucketMax 정상 포함 |
### 핵심 비교: 직접 호출 vs 백프레셔 적용
```
[직접호출 - 백프레셔 미적용]
총 20건 동시 요청 → 200: 10건, 429: 10건 (50% 실패)
버킷 capacity(10) 초과로 절반이 거부됨
[백프레셔 적용 - SafeApiCallerActor]
총 15건 burst 요청 → 200: 15건, 429: 0건 (100% 성공)
버킷 사용량: 1~3 수준으로 안정적 유지
```
### 백프레셔 적용 시 버킷 사용량 추이
```
safe-0 → bucket=1/10
safe-1 → bucket=2/10
safe-2 → bucket=3/10
safe-3 → bucket=2/10 ← leak으로 감소
safe-4 → bucket=3/10
safe-5 → bucket=2/10 ← leak으로 감소
safe-6 → bucket=3/10
...반복 패턴 (2~3 수준 유지)
```
throttle(2 req/sec)과 서버 leak rate(2/sec)가 균형을 이루어 버킷이 안전 범위(1~3) 내에서 안정적으로 유지됩니다.
## 아키텍처 구성
```
[사용자] → ApiRequest → [SafeApiCallerActor]
│
┌─────────▼──────────┐
│ Akka Streams 파이프라인 │
│ │
│ Source.actorRef │ ← 유입 밸브 (buffer=100)
│ │ │
│ throttle(2/s) │ ← 유량 조절기
│ │ │
│ mapAsync(1) │ ← 비동기 HTTP 펌프
│ + 적응형 지연 │ (429 자동 재시도)
│ │ │
│ Sink.foreach │ ← 응답 배출구
└───────│────────────┘
▼
[사용자] ← ApiResponse ← replyTo.tell()
```
## 생성된 파일
| 파일 | 역할 |
|------|------|
| `message/Messages.java` | ApiRequest, ApiResponse, StreamEnvelope, StreamResult 메시지 정의 |
| `api/DummyCafe24Server.java` | Leaky Bucket 더미 API 서버 (JDK HttpServer 기반) |
| `actor/SafeApiCallerActor.java` | 백프레셔 적용 안전 호출 액터 (재사용 가능) |
| `SafeApiCallerTest.java` | 5건 유닛테스트 (TestKit 기반) |
|
장치 이용 가이드: SafeApiCallerActor 활용법
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
# Cafe24 API 안전 호출 장치 사용 가이드
## 개요
카페24 API의 Leaky Bucket 호출 제약 정책에 대응하는 **Akka Streams 기반 백프레셔 안전 호출 장치**입니다. 사용자는 API 호출 제약(TPS, 429 에러, 버킷 상태)을 전혀 신경 쓰지 않고, 요청을 보내고 결과만 받으면 됩니다.
## 핵심 컴포넌트
### SafeApiCallerActor
호출 제약을 내부적으로 처리하는 재사용 가능한 액터입니다.
```java
// 생성: API 기본 URL + 초당 최대 호출 수 설정
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("https://api.cafe24.com", 2),
"cafe24-api-caller"
);
// 요청: 단어와 응답 받을 ActorRef 전달
caller.tell(new ApiRequest("hello", myActorRef), ActorRef.noSender());
// 응답: ApiResponse 메시지로 수신
// - word: 원본 요청 단어
// - result: API 응답 본문
// - statusCode: HTTP 상태 코드
// - bucketUsed/bucketMax: 버킷 상태
```
### 내부 동작 (유압장치 비유)
```
요청 투입 ──→ [버퍼 탱크] ──→ [유량 조절 밸브] ──→ [비동기 펌프] ──→ 응답 배출
(tell) (100건 버퍼) (throttle 2/s) (mapAsync HTTP) (replyTo.tell)
│
[압력 게이지]
(X-Api-Call-Limit 헤더 읽기)
│
사용률 > 80%: +500ms 지연
사용률 > 50%: +200ms 지연
429 응답: 대기 후 재시도
```
## 사용 방법
### 1. 기본 사용법 (단일 API)
```java
// ActorSystem 초기화
ActorSystem system = ActorSystem.create("my-system");
// 안전 호출 장치 생성 (초당 2건 제한)
ActorRef apiCaller = system.actorOf(
SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2", 2),
"product-api"
);
// 요청 보내기 (호출 제약은 장치가 알아서 처리)
apiCaller.tell(new ApiRequest("products/count", self()), self());
// 응답 처리
receive(ApiResponse.class, response -> {
System.out.println("결과: " + response.result());
System.out.println("버킷 상태: " + response.bucketUsed() + "/" + response.bucketMax());
});
```
### 2. 다중 API 엔드포인트 (장치 재사용)
```java
// 서로 다른 API 엔드포인트에 독립적인 안전 호출 장치 생성
ActorRef productApi = system.actorOf(
SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2/products", 2),
"product-api"
);
ActorRef orderApi = system.actorOf(
SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2/orders", 2),
"order-api"
);
// 각 장치는 독립적으로 백프레셔 적용
productApi.tell(new ApiRequest("count", self()), self());
orderApi.tell(new ApiRequest("list", self()), self());
```
### 3. 대량 처리 (burst 안전)
```java
// 100건의 요청을 한번에 투입해도 안전
for (int i = 0; i < 100; i++) {
apiCaller.tell(new ApiRequest("item-" + i, self()), self());
}
// → throttle이 초당 2건씩 처리
// → 버킷 오버플로 없이 전부 성공
// → 약 50초 후 모든 응답 수신
```
## 성능 튜닝
### throttle 설정 가이드
| Cafe24 버킷 설정 | 권장 throttle | 안전 마진 |
|-----------------|--------------|----------|
| capacity=40, leak=2/s | `maxRequestsPerSecond=2` | 최대 안전 (버킷 거의 미사용) |
| capacity=40, leak=2/s | `maxRequestsPerSecond=3` | 적절 (적응형 지연으로 보완) |
| capacity=40, leak=2/s | `maxRequestsPerSecond=5` | 공격적 (적응형 지연 의존) |
**권장**: `maxRequestsPerSecond <= leak rate`로 설정하면 버킷이 거의 차지 않아 가장 안전합니다.
### 버퍼 크기 조절
```java
// Source.actorRef의 버퍼 크기 (현재 100)
// 대량 burst가 예상되면 증가 가능
Source.<StreamEnvelope>actorRef(500, OverflowStrategy.dropNew())
```
| 버퍼 크기 | 용도 |
|----------|------|
| 100 | 일반 사용 (기본값) |
| 500 | 대량 burst 허용 |
| 1000 | 극대량 처리 |
### OverflowStrategy 선택
| 전략 | 설명 | 권장 상황 |
|------|------|----------|
| `dropNew()` | 버퍼 초과 시 새 요청 폐기 | 최신 요청 우선 (현재 설정) |
| `dropHead()` | 가장 오래된 요청 폐기 | 최신 요청 우선 |
| `backpressure()` | upstream에 압력 전파 | Source.queue 사용 시 |
| `fail()` | 스트림 실패 | 데이터 손실 불허 시 |
## 적응형 백프레셔 상세
SafeApiCallerActor는 API 응답 헤더를 읽어 동적으로 호출 속도를 조절합니다:
```
X-Api-Call-Limit: 8/10 → 사용률 80% → +500ms 지연 추가
X-Api-Call-Limit: 5/10 → 사용률 50% → +200ms 지연 추가
X-Api-Call-Limit: 3/10 → 사용률 30% → 추가 지연 없음
HTTP 429 응답 → X-Cafe24-Call-Remain 초만큼 대기 후 재시도 (최대 3회)
```
이 메커니즘은 throttle의 기본 유량 제한 위에 추가로 작동하여 이중 안전장치 역할을 합니다.
## 실제 Cafe24 API 적용 시 변경 사항
1. **DummyCafe24Server 제거**: 실제 API URL로 교체
2. **인증 헤더 추가**: `HttpRequest.newBuilder().header("Authorization", "Bearer " + token)`
3. **요청/응답 포맷 변경**: Messages.java의 ApiRequest/ApiResponse를 실제 API 스키마에 맞게 확장
4. **throttle 값 조정**: 실제 Cafe24 정책(capacity=40, leak=2/s)에 맞게 설정
```java
// 실제 적용 예시
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("https://mallid.cafe24api.com/api/v2", 2),
"cafe24-safe-caller"
);
```
## 의존성
```groovy
dependencies {
implementation "com.typesafe.akka:akka-actor_2.13:2.7.0"
implementation "com.typesafe.akka:akka-stream_2.13:2.7.0"
}
```
|
핵심 구현 코드
카페24 더미 API
| Code Block | ||
|---|---|---|
| ||
package com.example.cafe24.api;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Cafe24 API를 시뮬레이션하는 더미 서버.
*
* <p>Leaky Bucket 알고리즘으로 요청 수를 제한합니다.
* <ul>
* <li>버킷 용량(capacity)을 초과하면 429 Too Many Requests 반환</li>
* <li>버킷은 매초 leakRate만큼 자동 감소</li>
* <li>응답 헤더: X-Api-Call-Limit, X-Cafe24-Call-Usage, X-Cafe24-Call-Remain</li>
* </ul>
*
* <p>응답 규칙: "hello" 요청 → "world" 반환, 나머지 단어는 그대로 에코.
*/
public class DummyCafe24Server {
private final HttpServer server;
private final int bucketCapacity;
private final int leakRatePerSecond;
private final AtomicInteger currentBucketLevel = new AtomicInteger(0);
private final ScheduledExecutorService leakScheduler;
public DummyCafe24Server(int port, int bucketCapacity, int leakRatePerSecond) throws IOException {
this.bucketCapacity = bucketCapacity;
this.leakRatePerSecond = leakRatePerSecond;
// Leaky Bucket 드레인: 매초 leakRate만큼 버킷 수위 감소
this.leakScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "leaky-bucket-drain");
t.setDaemon(true);
return t;
});
this.leakScheduler.scheduleAtFixedRate(
() -> currentBucketLevel.updateAndGet(c -> Math.max(0, c - leakRatePerSecond)),
1, 1, TimeUnit.SECONDS
);
this.server = HttpServer.create(new InetSocketAddress(port), 0);
this.server.createContext("/api/echo", this::handleEcho);
this.server.setExecutor(Executors.newFixedThreadPool(4));
}
public void start() {
server.start();
}
public void stop() {
server.stop(0);
leakScheduler.shutdown();
}
/** 테스트 간 버킷 상태 초기화 */
public void reset() {
currentBucketLevel.set(0);
}
public int getCurrentBucketLevel() {
return currentBucketLevel.get();
}
public int getBucketCapacity() {
return bucketCapacity;
}
private void handleEcho(HttpExchange exchange) throws IOException {
if (!"GET".equals(exchange.getRequestMethod())) {
exchange.sendResponseHeaders(405, -1);
return;
}
int level = currentBucketLevel.incrementAndGet();
if (level > bucketCapacity) {
// 버킷 초과 - 429 반환 (실제 소비하지 않으므로 감소)
currentBucketLevel.decrementAndGet();
double usagePercent = ((double) level / bucketCapacity) * 100;
int remainSeconds = (int) Math.ceil((double) (level - bucketCapacity) / leakRatePerSecond) + 1;
exchange.getResponseHeaders().add("X-Api-Call-Limit", level + "/" + bucketCapacity);
exchange.getResponseHeaders().add("X-Cafe24-Call-Usage", String.format("%.1f", usagePercent));
exchange.getResponseHeaders().add("X-Cafe24-Call-Remain", String.valueOf(remainSeconds));
byte[] body = "Too Many Requests".getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(429, body.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(body);
}
return;
}
// 파라미터 파싱
String word = parseWordParam(exchange.getRequestURI().getQuery());
// "hello" → "world", 나머지는 그대로 에코
String result = "hello".equals(word) ? "world" : word;
// 응답 헤더 설정
double usagePercent = ((double) level / bucketCapacity) * 100;
exchange.getResponseHeaders().add("X-Api-Call-Limit", level + "/" + bucketCapacity);
exchange.getResponseHeaders().add("X-Cafe24-Call-Usage", String.format("%.1f", usagePercent));
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
byte[] responseBytes = result.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(200, responseBytes.length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(responseBytes);
}
}
private String parseWordParam(String query) {
if (query == null) return "";
for (String param : query.split("&")) {
String[] pair = param.split("=", 2);
if (pair.length == 2 && "word".equals(pair[0])) {
return URLDecoder.decode(pair[1], StandardCharsets.UTF_8);
}
}
return "";
}
}
|
백프레셔가 포함된 안전호출 액터모델
| Code Block | ||||
|---|---|---|---|---|
| ||||
package com.example.cafe24.actor;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.stream.OverflowStrategy;
import akka.stream.SystemMaterializer;
import akka.stream.ThrottleMode;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.example.cafe24.message.Messages.ApiRequest;
import com.example.cafe24.message.Messages.ApiResponse;
import com.example.cafe24.message.Messages.StreamEnvelope;
import com.example.cafe24.message.Messages.StreamResult;
import scala.concurrent.duration.FiniteDuration;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* Cafe24 API를 백프레셔를 적용하여 안전하게 호출하는 액터.
*
* <p>내부에 Akka Streams 파이프라인(유압장치)을 구성하여:
* <ul>
* <li>Source.actorRef: 요청 유입구 (밸브)</li>
* <li>throttle: 유량 조절기 (초당 maxRequestsPerSecond 제한)</li>
* <li>mapAsync: 비동기 HTTP 호출 펌프 (블로킹 없음)</li>
* <li>Sink.foreach: 응답 배출구 (원래 요청자에게 전달)</li>
* </ul>
*
* <p>사용자는 {@link ApiRequest}를 보내고 {@link ApiResponse}를 받기만 하면 됩니다.
* 호출 제약(백프레셔, TPS, 429 재시도)은 이 액터가 모두 처리합니다.
*
* <p><b>재사용 방법</b>: apiBaseUrl과 maxRequestsPerSecond를 설정하여
* 다양한 Cafe24 API 엔드포인트에 대해 독립적인 안전 호출 장치를 생성할 수 있습니다.
*/
public class SafeApiCallerActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
private final String apiBaseUrl;
private final int maxRequestsPerSecond;
private final HttpClient httpClient;
private ActorRef streamEntry;
public SafeApiCallerActor(String apiBaseUrl, int maxRequestsPerSecond) {
this.apiBaseUrl = apiBaseUrl;
this.maxRequestsPerSecond = maxRequestsPerSecond;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
}
public static Props props(String apiBaseUrl, int maxRequestsPerSecond) {
return Props.create(SafeApiCallerActor.class, apiBaseUrl, maxRequestsPerSecond);
}
@Override
public void preStart() {
var materializer = SystemMaterializer.get(getContext().getSystem()).materializer();
// Akka Streams 백프레셔 파이프라인 구성
// Source.actorRef(유입) → throttle(유량제한) → mapAsync(비동기호출) → Sink(응답전달)
streamEntry = Source.<StreamEnvelope>actorRef(100, OverflowStrategy.dropNew())
.throttle(
maxRequestsPerSecond,
FiniteDuration.create(1, TimeUnit.SECONDS),
maxRequestsPerSecond,
ThrottleMode.shaping()
)
.mapAsync(1, this::callApiWithAdaptiveBackpressure)
.to(Sink.foreach(this::deliverResponse))
.run(materializer);
log.info("SafeApiCallerActor 초기화 완료 - baseUrl={}, maxRps={}", apiBaseUrl, maxRequestsPerSecond);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ApiRequest.class, request -> {
// 요청을 스트림 파이프라인에 투입 (논블로킹)
streamEntry.tell(new StreamEnvelope(request), getSelf());
})
.build();
}
/**
* 적응형 백프레셔를 적용한 API 호출.
* 응답 헤더의 버킷 사용률에 따라 추가 지연을 동적으로 적용합니다.
*/
private CompletionStage<StreamResult> callApiWithAdaptiveBackpressure(StreamEnvelope envelope) {
return executeHttpCall(envelope, 0)
.exceptionally(ex -> {
log.error("API 호출 실패: {}", ex.getMessage());
return new StreamResult(
envelope.request(),
new ApiResponse(envelope.request().word(), "ERROR: " + ex.getMessage(), 500, 0, 0)
);
});
}
private CompletionStage<StreamResult> executeHttpCall(StreamEnvelope envelope, int retryCount) {
String word = envelope.request().word();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(apiBaseUrl + "/api/echo?word=" + word))
.timeout(Duration.ofSeconds(5))
.GET()
.build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenCompose(response -> {
int statusCode = response.statusCode();
if (statusCode == 429 && retryCount < 3) {
// Rate limited - X-Cafe24-Call-Remain 헤더 기반 대기 후 재시도
String remainStr = response.headers()
.firstValue("X-Cafe24-Call-Remain").orElse("1");
int waitSeconds = Integer.parseInt(remainStr);
log.warning("429 응답 수신 - {}초 후 재시도 (retry={})", waitSeconds, retryCount + 1);
return delay(waitSeconds * 1000L)
.thenCompose(v -> executeHttpCall(envelope, retryCount + 1));
}
// 응답 헤더에서 버킷 상태 파싱
String callLimit = response.headers()
.firstValue("X-Api-Call-Limit").orElse("0/10");
String[] parts = callLimit.split("/");
int bucketUsed = Integer.parseInt(parts[0]);
int bucketMax = Integer.parseInt(parts[1]);
double usageRatio = (double) bucketUsed / bucketMax;
StreamResult result = new StreamResult(
envelope.request(),
new ApiResponse(word, response.body(), statusCode, bucketUsed, bucketMax)
);
// 적응형 백프레셔: 버킷 사용률 기반 추가 지연
// 유압장치의 압력 게이지를 읽고 밸브를 조절하는 것과 같은 원리
long adaptiveDelayMs = 0;
if (usageRatio > 0.8) {
adaptiveDelayMs = 500;
log.info("버킷 사용률 {}% - 500ms 추가 지연 적용", (int) (usageRatio * 100));
} else if (usageRatio > 0.5) {
adaptiveDelayMs = 200;
}
if (adaptiveDelayMs > 0) {
return delay(adaptiveDelayMs).thenApply(v -> result);
}
return CompletableFuture.completedFuture(result);
});
}
/** 스트림 출구: 완료된 응답을 원래 요청자에게 전달 */
private void deliverResponse(StreamResult result) {
ActorRef replyTo = result.originalRequest().replyTo();
replyTo.tell(result.response(), getSelf());
}
/** 논블로킹 지연 (Thread.sleep 대신 delayedExecutor 사용) */
private CompletionStage<Void> delay(long millis) {
return CompletableFuture.supplyAsync(
() -> null,
CompletableFuture.delayedExecutor(millis, TimeUnit.MILLISECONDS)
);
}
}
|
유닛테스트
| Code Block | ||||
|---|---|---|---|---|
| ||||
package com.example.cafe24;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.javadsl.TestKit;
import com.example.cafe24.actor.SafeApiCallerActor;
import com.example.cafe24.api.DummyCafe24Server;
import com.example.cafe24.message.Messages.ApiRequest;
import com.example.cafe24.message.Messages.ApiResponse;
import org.junit.jupiter.api.*;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.junit.jupiter.api.Assertions.*;
/**
* SafeApiCallerActor 백프레셔 장치 검증 테스트.
*
* <p>시나리오:
* <ol>
* <li>hello → world 변환 검증</li>
* <li>일반 단어 에코 검증</li>
* <li>직접 호출(백프레셔 미적용) 시 429 발생 확인</li>
* <li>백프레셔 적용 시 대량 burst 요청이 429 없이 모두 성공</li>
* </ol>
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class SafeApiCallerTest {
private static ActorSystem system;
private static DummyCafe24Server dummyServer;
private static final int SERVER_PORT = 18080;
private static final int BUCKET_CAPACITY = 10;
private static final int LEAK_RATE = 2; // 초당 2회 감소
@BeforeAll
static void setup() throws Exception {
system = ActorSystem.create("test-system");
dummyServer = new DummyCafe24Server(SERVER_PORT, BUCKET_CAPACITY, LEAK_RATE);
dummyServer.start();
// 서버 준비 대기
Thread.sleep(500);
}
@AfterAll
static void teardown() {
dummyServer.stop();
TestKit.shutdownActorSystem(system);
}
@BeforeEach
void resetServer() throws InterruptedException {
dummyServer.reset();
// 버킷 초기화 후 안정화 대기
Thread.sleep(200);
}
@Test
@Order(1)
@DisplayName("hello 요청 시 world 응답 반환")
void testHelloWorld() {
new TestKit(system) {{
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
caller.tell(new ApiRequest("hello", getRef()), getRef());
ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
assertEquals("hello", response.word());
assertEquals("world", response.result());
assertEquals(200, response.statusCode());
expectNoMessage(Duration.ofMillis(200));
}};
}
@Test
@Order(2)
@DisplayName("일반 단어 에코 응답 반환")
void testEchoResponse() {
new TestKit(system) {{
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
caller.tell(new ApiRequest("akka-stream", getRef()), getRef());
ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
assertEquals("akka-stream", response.word());
assertEquals("akka-stream", response.result());
assertEquals(200, response.statusCode());
expectNoMessage(Duration.ofMillis(200));
}};
}
@Test
@Order(3)
@DisplayName("직접 호출: 대량 동시 요청 시 429 발생 확인 (백프레셔 미적용)")
void testDirectCallCauses429() {
HttpClient client = HttpClient.newHttpClient();
int totalRequests = 20;
List<CompletableFuture<HttpResponse<String>>> futures = new ArrayList<>();
// 백프레셔 없이 20건 동시 호출 → 버킷(capacity=10) 초과 → 429 발생
for (int i = 0; i < totalRequests; i++) {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + SERVER_PORT + "/api/echo?word=direct-" + i))
.timeout(Duration.ofSeconds(5))
.GET()
.build();
futures.add(client.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
}
long count429 = futures.stream()
.map(CompletableFuture::join)
.filter(r -> r.statusCode() == 429)
.count();
long count200 = futures.stream()
.map(CompletableFuture::join)
.filter(r -> r.statusCode() == 200)
.count();
System.out.println("[직접호출] 총 " + totalRequests + "건 → 200: " + count200 + "건, 429: " + count429 + "건");
assertTrue(count429 > 0, "백프레셔 없이 동시 " + totalRequests + "건 호출 시 429가 발생해야 합니다");
}
@Test
@Order(4)
@DisplayName("백프레셔 적용: 대량 burst 요청이 429 없이 모두 200 성공")
void testBackpressurePrevents429() {
new TestKit(system) {{
// 백프레셔 적용 - 초당 2건으로 throttle (버킷 leak rate와 동일)
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
int totalRequests = 15;
for (int i = 0; i < totalRequests; i++) {
caller.tell(new ApiRequest("safe-" + i, getRef()), getRef());
}
// 모든 응답 수집 (throttle로 인해 ~8초 소요)
List<ApiResponse> responses = new ArrayList<>();
for (int i = 0; i < totalRequests; i++) {
ApiResponse response = expectMsgClass(Duration.ofSeconds(30), ApiResponse.class);
responses.add(response);
}
// 모든 응답이 200이어야 함 (429 없음)
long successCount = responses.stream()
.filter(r -> r.statusCode() == 200)
.count();
System.out.println("[백프레셔] 총 " + totalRequests + "건 → 200: " + successCount + "건");
for (ApiResponse r : responses) {
System.out.println(" word=" + r.word() + ", result=" + r.result()
+ ", bucket=" + r.bucketUsed() + "/" + r.bucketMax());
}
assertEquals(totalRequests, successCount,
"백프레셔 적용 시 모든 요청이 429 없이 성공해야 합니다");
// 에코 검증
for (ApiResponse r : responses) {
assertTrue(r.result().startsWith("safe-"),
"에코 응답이어야 합니다: " + r.result());
}
}};
}
@Test
@Order(5)
@DisplayName("버킷 상태 헤더가 응답에 포함됨")
void testBucketHeadersInResponse() {
new TestKit(system) {{
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
caller.tell(new ApiRequest("header-test", getRef()), getRef());
ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
assertTrue(response.bucketMax() > 0, "버킷 최대값이 포함되어야 합니다");
assertTrue(response.bucketUsed() > 0, "버킷 사용량이 포함되어야 합니다");
assertEquals(BUCKET_CAPACITY, response.bucketMax(), "버킷 최대값이 서버 설정과 일치해야 합니다");
}};
}
}
|
지원가능 스킬 목록
다음 과제: 분산환경 클러스터 모드로의 확장
| 스킬 | 명령어 | 플랫폼 |
|---|---|---|
| Java Akka Classic | /java-akka-classic | Java + Akka Classic 2.7.x |
| Kotlin Pekko Typed | /kotlin-pekko-typed | Kotlin + Pekko Typed 1.4.x |
| C# Akka.NET | /dotnet-akka-net | C# + Akka.NET 1.5.x |
| Java Akka Classic Test | /java-akka-classic-test | Java + Akka Classic TestKit |
| Kotlin Pekko Typed Test | /kotlin-pekko-typed-test | Kotlin + Pekko Typed ActorTestKit |
| C# Akka.NET Test | /dotnet-akka-net-test | C# + Akka.TestKit.Xunit2 |
| Java Akka Classic Cluster | /java-akka-classic-cluster | Java + Akka Classic Cluster 2.7.x |
| Kotlin Pekko Typed Cluster | /kotlin-pekko-typed-cluster | Kotlin + Pekko Typed Cluster 1.4.x |
| C# Akka.NET Cluster | /dotnet-akka-net-cluster | C# + Akka.NET Cluster 1.5.x |
| Java Akka Classic Infra | /java-akka-classic-infra | Java + Akka Classic + Docker/K8s |
| Kotlin Pekko Typed Infra | /kotlin-pekko-typed-infra | Kotlin + Pekko Typed + Docker/K8s |
| C# Akka.NET Infra | /dotnet-akka-net-infra | C# + Akka.NET + Docker/K8s |
| AI Agent Pipeline (.NET) | /actor-ai-agent | C# + Akka.NET + LLM |
| AI Agent Pipeline (Java) | /actor-ai-agent-java | Java + Akka Classic + LLM |
| AI Agent Pipeline (Kotlin) | /actor-ai-agent-kotlin | Kotlin + Pekko Typed + LLM |
- 지금 작성된 로컬액터를
java-akka-classic-cluster,java-akka-classic-infra스킬을 활용해 분산처리 가능한 모델로 확장하고 쿠버네티스에서 분산배포후 안전호출량 모듈이 잘작동하나 테스트도 수행할것
스킬이 잘 설계되어 있으면, 프롬프트(플랜) 하나로 코드 생성부터 테스트까지 일관되게 수행할 수 있습니다. 이 경험을 통해 느낀 점은, 생성된 코드 자체보다 코드를 생성하는 프롬프트(플랜)와 스킬의 관리가 더 중요해졌다는 것입니다.
작업 완료 후 수행 결과를 셀프 제작 MCP 메모리("메모라이저")에 저장하면, 다음과 같은 메모리 조각이 추가되어 지금까지의 구현 여정을 간략히 조회할 수 있습니다. Claude Code에 자동 메모리가 기본 탑재되어 있지만, 테크 정리 목적의 장기 기억은 별도의 MCP 메모리로 역할을 분리하여 활용하고 있습니다.
- Claude Code에 자동 메모리가 기본 탑재되어 있지만, 셀프 제작 MCP 메모리는 테크 정리 장기 기억 메모리로 역할을 구분해 활용 중입니다. 메모리 기능이 기본 탑재됨에 따라 MCP 메모리를 우선 채택하지 않고 이름을 부여하여 "메모라이저"에 저장하는 방식으로, 바이브 경험의 "메모리" 키워드는 순정 모드에게 양보하였습니다.
Next : 클러스터로의 확장
로컬로 개발된 스펙 분산처리 클러스터로 확장하기 by 바이브모드more : 컨텍스트 맵핑과 API


