Page History
...
| Code Block | ||||
|---|---|---|---|---|
| ||||
package com.example.cafe24.actor;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.stream.OverflowStrategy;
import akka.stream.SystemMaterializer;
import akka.stream.ThrottleMode;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.example.cafe24.message.Messages.ApiRequest;
import com.example.cafe24.message.Messages.ApiResponse;
import com.example.cafe24.message.Messages.StreamEnvelope;
import com.example.cafe24.message.Messages.StreamResult;
import scala.concurrent.duration.FiniteDuration;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* Cafe24 API를 백프레셔를 적용하여 안전하게 호출하는 액터.
*
* <p>내부에 Akka Streams 파이프라인(유압장치)을 구성하여:
* <ul>
* <li>Source.actorRef: 요청 유입구 (밸브)</li>
* <li>throttle: 유량 조절기 (초당 maxRequestsPerSecond 제한)</li>
* <li>mapAsync: 비동기 HTTP 호출 펌프 (블로킹 없음)</li>
* <li>Sink.foreach: 응답 배출구 (원래 요청자에게 전달)</li>
* </ul>
*
* <p>사용자는 {@link ApiRequest}를 보내고 {@link ApiResponse}를 받기만 하면 됩니다.
* 호출 제약(백프레셔, TPS, 429 재시도)은 이 액터가 모두 처리합니다.
*
* <p><b>재사용 방법</b>: apiBaseUrl과 maxRequestsPerSecond를 설정하여
* 다양한 Cafe24 API 엔드포인트에 대해 독립적인 안전 호출 장치를 생성할 수 있습니다.
*/
public class SafeApiCallerActor extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
private final String apiBaseUrl;
private final int maxRequestsPerSecond;
private final HttpClient httpClient;
private ActorRef streamEntry;
public SafeApiCallerActor(String apiBaseUrl, int maxRequestsPerSecond) {
this.apiBaseUrl = apiBaseUrl;
this.maxRequestsPerSecond = maxRequestsPerSecond;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
}
public static Props props(String apiBaseUrl, int maxRequestsPerSecond) {
return Props.create(SafeApiCallerActor.class, apiBaseUrl, maxRequestsPerSecond);
}
@Override
public void preStart() {
var materializer = SystemMaterializer.get(getContext().getSystem()).materializer();
// Akka Streams 백프레셔 파이프라인 구성
// Source.actorRef(유입) → throttle(유량제한) → mapAsync(비동기호출) → Sink(응답전달)
streamEntry = Source.<StreamEnvelope>actorRef(100, OverflowStrategy.dropNew())
.throttle(
maxRequestsPerSecond,
FiniteDuration.create(1, TimeUnit.SECONDS),
maxRequestsPerSecond,
ThrottleMode.shaping()
)
.mapAsync(1, this::callApiWithAdaptiveBackpressure)
.to(Sink.foreach(this::deliverResponse))
.run(materializer);
log.info("SafeApiCallerActor 초기화 완료 - baseUrl={}, maxRps={}", apiBaseUrl, maxRequestsPerSecond);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ApiRequest.class, request -> {
// 요청을 스트림 파이프라인에 투입 (논블로킹)
streamEntry.tell(new StreamEnvelope(request), getSelf());
})
.build();
}
/**
* 적응형 백프레셔를 적용한 API 호출.
* 응답 헤더의 버킷 사용률에 따라 추가 지연을 동적으로 적용합니다.
*/
private CompletionStage<StreamResult> callApiWithAdaptiveBackpressure(StreamEnvelope envelope) {
return executeHttpCall(envelope, 0)
.exceptionally(ex -> {
log.error("API 호출 실패: {}", ex.getMessage());
return new StreamResult(
envelope.request(),
new ApiResponse(envelope.request().word(), "ERROR: " + ex.getMessage(), 500, 0, 0)
);
});
}
private CompletionStage<StreamResult> executeHttpCall(StreamEnvelope envelope, int retryCount) {
String word = envelope.request().word();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(apiBaseUrl + "/api/echo?word=" + word))
.timeout(Duration.ofSeconds(5))
.GET()
.build();
return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenCompose(response -> {
int statusCode = response.statusCode();
if (statusCode == 429 && retryCount < 3) {
// Rate limited - X-Cafe24-Call-Remain 헤더 기반 대기 후 재시도
String remainStr = response.headers()
.firstValue("X-Cafe24-Call-Remain").orElse("1");
int waitSeconds = Integer.parseInt(remainStr);
log.warning("429 응답 수신 - {}초 후 재시도 (retry={})", waitSeconds, retryCount + 1);
return delay(waitSeconds * 1000L)
.thenCompose(v -> executeHttpCall(envelope, retryCount + 1));
}
// 응답 헤더에서 버킷 상태 파싱
String callLimit = response.headers()
.firstValue("X-Api-Call-Limit").orElse("0/10");
String[] parts = callLimit.split("/");
int bucketUsed = Integer.parseInt(parts[0]);
int bucketMax = Integer.parseInt(parts[1]);
double usageRatio = (double) bucketUsed / bucketMax;
StreamResult result = new StreamResult(
envelope.request(),
new ApiResponse(word, response.body(), statusCode, bucketUsed, bucketMax)
);
// 적응형 백프레셔: 버킷 사용률 기반 추가 지연
// 유압장치의 압력 게이지를 읽고 밸브를 조절하는 것과 같은 원리
long adaptiveDelayMs = 0;
if (usageRatio > 0.8) {
adaptiveDelayMs = 500;
log.info("버킷 사용률 {}% - 500ms 추가 지연 적용", (int) (usageRatio * 100));
} else if (usageRatio > 0.5) {
adaptiveDelayMs = 200;
}
if (adaptiveDelayMs > 0) {
return delay(adaptiveDelayMs).thenApply(v -> result);
}
return CompletableFuture.completedFuture(result);
});
}
/** 스트림 출구: 완료된 응답을 원래 요청자에게 전달 */
private void deliverResponse(StreamResult result) {
ActorRef replyTo = result.originalRequest().replyTo();
replyTo.tell(result.response(), getSelf());
}
/** 논블로킹 지연 (Thread.sleep 대신 delayedExecutor 사용) */
private CompletionStage<Void> delay(long millis) {
return CompletableFuture.supplyAsync(
() -> null,
CompletableFuture.delayedExecutor(millis, TimeUnit.MILLISECONDS)
);
}
}
|
| Code Block | ||||
|---|---|---|---|---|
| ||||
package com.example.cafe24;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.javadsl.TestKit;
import com.example.cafe24.actor.SafeApiCallerActor;
import com.example.cafe24.api.DummyCafe24Server;
import com.example.cafe24.message.Messages.ApiRequest;
import com.example.cafe24.message.Messages.ApiResponse;
import org.junit.jupiter.api.*;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.junit.jupiter.api.Assertions.*;
/**
* SafeApiCallerActor 백프레셔 장치 검증 테스트.
*
* <p>시나리오:
* <ol>
* <li>hello → world 변환 검증</li>
* <li>일반 단어 에코 검증</li>
* <li>직접 호출(백프레셔 미적용) 시 429 발생 확인</li>
* <li>백프레셔 적용 시 대량 burst 요청이 429 없이 모두 성공</li>
* </ol>
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class SafeApiCallerTest {
private static ActorSystem system;
private static DummyCafe24Server dummyServer;
private static final int SERVER_PORT = 18080;
private static final int BUCKET_CAPACITY = 10;
private static final int LEAK_RATE = 2; // 초당 2회 감소
@BeforeAll
static void setup() throws Exception {
system = ActorSystem.create("test-system");
dummyServer = new DummyCafe24Server(SERVER_PORT, BUCKET_CAPACITY, LEAK_RATE);
dummyServer.start();
// 서버 준비 대기
Thread.sleep(500);
}
@AfterAll
static void teardown() {
dummyServer.stop();
TestKit.shutdownActorSystem(system);
}
@BeforeEach
void resetServer() throws InterruptedException {
dummyServer.reset();
// 버킷 초기화 후 안정화 대기
Thread.sleep(200);
}
@Test
@Order(1)
@DisplayName("hello 요청 시 world 응답 반환")
void testHelloWorld() {
new TestKit(system) {{
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
caller.tell(new ApiRequest("hello", getRef()), getRef());
ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
assertEquals("hello", response.word());
assertEquals("world", response.result());
assertEquals(200, response.statusCode());
expectNoMessage(Duration.ofMillis(200));
}};
}
@Test
@Order(2)
@DisplayName("일반 단어 에코 응답 반환")
void testEchoResponse() {
new TestKit(system) {{
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
caller.tell(new ApiRequest("akka-stream", getRef()), getRef());
ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
assertEquals("akka-stream", response.word());
assertEquals("akka-stream", response.result());
assertEquals(200, response.statusCode());
expectNoMessage(Duration.ofMillis(200));
}};
}
@Test
@Order(3)
@DisplayName("직접 호출: 대량 동시 요청 시 429 발생 확인 (백프레셔 미적용)")
void testDirectCallCauses429() {
HttpClient client = HttpClient.newHttpClient();
int totalRequests = 20;
List<CompletableFuture<HttpResponse<String>>> futures = new ArrayList<>();
// 백프레셔 없이 20건 동시 호출 → 버킷(capacity=10) 초과 → 429 발생
for (int i = 0; i < totalRequests; i++) {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:" + SERVER_PORT + "/api/echo?word=direct-" + i))
.timeout(Duration.ofSeconds(5))
.GET()
.build();
futures.add(client.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
}
long count429 = futures.stream()
.map(CompletableFuture::join)
.filter(r -> r.statusCode() == 429)
.count();
long count200 = futures.stream()
.map(CompletableFuture::join)
.filter(r -> r.statusCode() == 200)
.count();
System.out.println("[직접호출] 총 " + totalRequests + "건 → 200: " + count200 + "건, 429: " + count429 + "건");
assertTrue(count429 > 0, "백프레셔 없이 동시 " + totalRequests + "건 호출 시 429가 발생해야 합니다");
}
@Test
@Order(4)
@DisplayName("백프레셔 적용: 대량 burst 요청이 429 없이 모두 200 성공")
void testBackpressurePrevents429() {
new TestKit(system) {{
// 백프레셔 적용 - 초당 2건으로 throttle (버킷 leak rate와 동일)
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
int totalRequests = 15;
for (int i = 0; i < totalRequests; i++) {
caller.tell(new ApiRequest("safe-" + i, getRef()), getRef());
}
// 모든 응답 수집 (throttle로 인해 ~8초 소요)
List<ApiResponse> responses = new ArrayList<>();
for (int i = 0; i < totalRequests; i++) {
ApiResponse response = expectMsgClass(Duration.ofSeconds(30), ApiResponse.class);
responses.add(response);
}
// 모든 응답이 200이어야 함 (429 없음)
long successCount = responses.stream()
.filter(r -> r.statusCode() == 200)
.count();
System.out.println("[백프레셔] 총 " + totalRequests + "건 → 200: " + successCount + "건");
for (ApiResponse r : responses) {
System.out.println(" word=" + r.word() + ", result=" + r.result()
+ ", bucket=" + r.bucketUsed() + "/" + r.bucketMax());
}
assertEquals(totalRequests, successCount,
"백프레셔 적용 시 모든 요청이 429 없이 성공해야 합니다");
// 에코 검증
for (ApiResponse r : responses) {
assertTrue(r.result().startsWith("safe-"),
"에코 응답이어야 합니다: " + r.result());
}
}};
}
@Test
@Order(5)
@DisplayName("버킷 상태 헤더가 응답에 포함됨")
void testBucketHeadersInResponse() {
new TestKit(system) {{
ActorRef caller = system.actorOf(
SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));
caller.tell(new ApiRequest("header-test", getRef()), getRef());
ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
assertTrue(response.bucketMax() > 0, "버킷 최대값이 포함되어야 합니다");
assertTrue(response.bucketUsed() > 0, "버킷 사용량이 포함되어야 합니다");
assertEquals(BUCKET_CAPACITY, response.bucketMax(), "버킷 최대값이 서버 설정과 일치해야 합니다");
}};
}
}
|