Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
themeEmacs
linenumberstrue
package com.example.cafe24.actor;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.stream.OverflowStrategy;
import akka.stream.SystemMaterializer;
import akka.stream.ThrottleMode;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.example.cafe24.message.Messages.ApiRequest;
import com.example.cafe24.message.Messages.ApiResponse;
import com.example.cafe24.message.Messages.StreamEnvelope;
import com.example.cafe24.message.Messages.StreamResult;
import scala.concurrent.duration.FiniteDuration;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

/**
 * Cafe24 API를 백프레셔를 적용하여 안전하게 호출하는 액터.
 *
 * <p>내부에 Akka Streams 파이프라인(유압장치)을 구성하여:
 * <ul>
 *   <li>Source.actorRef: 요청 유입구 (밸브)</li>
 *   <li>throttle: 유량 조절기 (초당 maxRequestsPerSecond 제한)</li>
 *   <li>mapAsync: 비동기 HTTP 호출 펌프 (블로킹 없음)</li>
 *   <li>Sink.foreach: 응답 배출구 (원래 요청자에게 전달)</li>
 * </ul>
 *
 * <p>사용자는 {@link ApiRequest}를 보내고 {@link ApiResponse}를 받기만 하면 됩니다.
 * 호출 제약(백프레셔, TPS, 429 재시도)은 이 액터가 모두 처리합니다.
 *
 * <p><b>재사용 방법</b>: apiBaseUrl과 maxRequestsPerSecond를 설정하여
 * 다양한 Cafe24 API 엔드포인트에 대해 독립적인 안전 호출 장치를 생성할 수 있습니다.
 */
public class SafeApiCallerActor extends AbstractActor {

    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
    private final String apiBaseUrl;
    private final int maxRequestsPerSecond;
    private final HttpClient httpClient;
    private ActorRef streamEntry;

    public SafeApiCallerActor(String apiBaseUrl, int maxRequestsPerSecond) {
        this.apiBaseUrl = apiBaseUrl;
        this.maxRequestsPerSecond = maxRequestsPerSecond;
        this.httpClient = HttpClient.newBuilder()
                .connectTimeout(Duration.ofSeconds(5))
                .build();
    }

    public static Props props(String apiBaseUrl, int maxRequestsPerSecond) {
        return Props.create(SafeApiCallerActor.class, apiBaseUrl, maxRequestsPerSecond);
    }

    @Override
    public void preStart() {
        var materializer = SystemMaterializer.get(getContext().getSystem()).materializer();

        // Akka Streams 백프레셔 파이프라인 구성
        // Source.actorRef(유입) → throttle(유량제한) → mapAsync(비동기호출) → Sink(응답전달)
        streamEntry = Source.<StreamEnvelope>actorRef(100, OverflowStrategy.dropNew())
                .throttle(
                        maxRequestsPerSecond,
                        FiniteDuration.create(1, TimeUnit.SECONDS),
                        maxRequestsPerSecond,
                        ThrottleMode.shaping()
                )
                .mapAsync(1, this::callApiWithAdaptiveBackpressure)
                .to(Sink.foreach(this::deliverResponse))
                .run(materializer);

        log.info("SafeApiCallerActor 초기화 완료 - baseUrl={}, maxRps={}", apiBaseUrl, maxRequestsPerSecond);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(ApiRequest.class, request -> {
                    // 요청을 스트림 파이프라인에 투입 (논블로킹)
                    streamEntry.tell(new StreamEnvelope(request), getSelf());
                })
                .build();
    }

    /**
     * 적응형 백프레셔를 적용한 API 호출.
     * 응답 헤더의 버킷 사용률에 따라 추가 지연을 동적으로 적용합니다.
     */
    private CompletionStage<StreamResult> callApiWithAdaptiveBackpressure(StreamEnvelope envelope) {
        return executeHttpCall(envelope, 0)
                .exceptionally(ex -> {
                    log.error("API 호출 실패: {}", ex.getMessage());
                    return new StreamResult(
                            envelope.request(),
                            new ApiResponse(envelope.request().word(), "ERROR: " + ex.getMessage(), 500, 0, 0)
                    );
                });
    }

    private CompletionStage<StreamResult> executeHttpCall(StreamEnvelope envelope, int retryCount) {
        String word = envelope.request().word();
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(apiBaseUrl + "/api/echo?word=" + word))
                .timeout(Duration.ofSeconds(5))
                .GET()
                .build();

        return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
                .thenCompose(response -> {
                    int statusCode = response.statusCode();

                    if (statusCode == 429 && retryCount < 3) {
                        // Rate limited - X-Cafe24-Call-Remain 헤더 기반 대기 후 재시도
                        String remainStr = response.headers()
                                .firstValue("X-Cafe24-Call-Remain").orElse("1");
                        int waitSeconds = Integer.parseInt(remainStr);
                        log.warning("429 응답 수신 - {}초 후 재시도 (retry={})", waitSeconds, retryCount + 1);

                        return delay(waitSeconds * 1000L)
                                .thenCompose(v -> executeHttpCall(envelope, retryCount + 1));
                    }

                    // 응답 헤더에서 버킷 상태 파싱
                    String callLimit = response.headers()
                            .firstValue("X-Api-Call-Limit").orElse("0/10");
                    String[] parts = callLimit.split("/");
                    int bucketUsed = Integer.parseInt(parts[0]);
                    int bucketMax = Integer.parseInt(parts[1]);
                    double usageRatio = (double) bucketUsed / bucketMax;

                    StreamResult result = new StreamResult(
                            envelope.request(),
                            new ApiResponse(word, response.body(), statusCode, bucketUsed, bucketMax)
                    );

                    // 적응형 백프레셔: 버킷 사용률 기반 추가 지연
                    // 유압장치의 압력 게이지를 읽고 밸브를 조절하는 것과 같은 원리
                    long adaptiveDelayMs = 0;
                    if (usageRatio > 0.8) {
                        adaptiveDelayMs = 500;
                        log.info("버킷 사용률 {}% - 500ms 추가 지연 적용", (int) (usageRatio * 100));
                    } else if (usageRatio > 0.5) {
                        adaptiveDelayMs = 200;
                    }

                    if (adaptiveDelayMs > 0) {
                        return delay(adaptiveDelayMs).thenApply(v -> result);
                    }
                    return CompletableFuture.completedFuture(result);
                });
    }

    /** 스트림 출구: 완료된 응답을 원래 요청자에게 전달 */
    private void deliverResponse(StreamResult result) {
        ActorRef replyTo = result.originalRequest().replyTo();
        replyTo.tell(result.response(), getSelf());
    }

    /** 논블로킹 지연 (Thread.sleep 대신 delayedExecutor 사용) */
    private CompletionStage<Void> delay(long millis) {
        return CompletableFuture.supplyAsync(
                () -> null,
                CompletableFuture.delayedExecutor(millis, TimeUnit.MILLISECONDS)
        );
    }
}



Code Block
themeEmacs
linenumberstrue
package com.example.cafe24;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.javadsl.TestKit;
import com.example.cafe24.actor.SafeApiCallerActor;
import com.example.cafe24.api.DummyCafe24Server;
import com.example.cafe24.message.Messages.ApiRequest;
import com.example.cafe24.message.Messages.ApiResponse;
import org.junit.jupiter.api.*;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static org.junit.jupiter.api.Assertions.*;

/**
 * SafeApiCallerActor 백프레셔 장치 검증 테스트.
 *
 * <p>시나리오:
 * <ol>
 *   <li>hello → world 변환 검증</li>
 *   <li>일반 단어 에코 검증</li>
 *   <li>직접 호출(백프레셔 미적용) 시 429 발생 확인</li>
 *   <li>백프레셔 적용 시 대량 burst 요청이 429 없이 모두 성공</li>
 * </ol>
 */
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class SafeApiCallerTest {

    private static ActorSystem system;
    private static DummyCafe24Server dummyServer;
    private static final int SERVER_PORT = 18080;
    private static final int BUCKET_CAPACITY = 10;
    private static final int LEAK_RATE = 2; // 초당 2회 감소

    @BeforeAll
    static void setup() throws Exception {
        system = ActorSystem.create("test-system");
        dummyServer = new DummyCafe24Server(SERVER_PORT, BUCKET_CAPACITY, LEAK_RATE);
        dummyServer.start();
        // 서버 준비 대기
        Thread.sleep(500);
    }

    @AfterAll
    static void teardown() {
        dummyServer.stop();
        TestKit.shutdownActorSystem(system);
    }

    @BeforeEach
    void resetServer() throws InterruptedException {
        dummyServer.reset();
        // 버킷 초기화 후 안정화 대기
        Thread.sleep(200);
    }

    @Test
    @Order(1)
    @DisplayName("hello 요청 시 world 응답 반환")
    void testHelloWorld() {
        new TestKit(system) {{
            ActorRef caller = system.actorOf(
                    SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));

            caller.tell(new ApiRequest("hello", getRef()), getRef());

            ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
            assertEquals("hello", response.word());
            assertEquals("world", response.result());
            assertEquals(200, response.statusCode());
            expectNoMessage(Duration.ofMillis(200));
        }};
    }

    @Test
    @Order(2)
    @DisplayName("일반 단어 에코 응답 반환")
    void testEchoResponse() {
        new TestKit(system) {{
            ActorRef caller = system.actorOf(
                    SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));

            caller.tell(new ApiRequest("akka-stream", getRef()), getRef());

            ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
            assertEquals("akka-stream", response.word());
            assertEquals("akka-stream", response.result());
            assertEquals(200, response.statusCode());
            expectNoMessage(Duration.ofMillis(200));
        }};
    }

    @Test
    @Order(3)
    @DisplayName("직접 호출: 대량 동시 요청 시 429 발생 확인 (백프레셔 미적용)")
    void testDirectCallCauses429() {
        HttpClient client = HttpClient.newHttpClient();
        int totalRequests = 20;
        List<CompletableFuture<HttpResponse<String>>> futures = new ArrayList<>();

        // 백프레셔 없이 20건 동시 호출 → 버킷(capacity=10) 초과 → 429 발생
        for (int i = 0; i < totalRequests; i++) {
            HttpRequest request = HttpRequest.newBuilder()
                    .uri(URI.create("http://localhost:" + SERVER_PORT + "/api/echo?word=direct-" + i))
                    .timeout(Duration.ofSeconds(5))
                    .GET()
                    .build();
            futures.add(client.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
        }

        long count429 = futures.stream()
                .map(CompletableFuture::join)
                .filter(r -> r.statusCode() == 429)
                .count();

        long count200 = futures.stream()
                .map(CompletableFuture::join)
                .filter(r -> r.statusCode() == 200)
                .count();

        System.out.println("[직접호출] 총 " + totalRequests + "건 → 200: " + count200 + "건, 429: " + count429 + "건");
        assertTrue(count429 > 0, "백프레셔 없이 동시 " + totalRequests + "건 호출 시 429가 발생해야 합니다");
    }

    @Test
    @Order(4)
    @DisplayName("백프레셔 적용: 대량 burst 요청이 429 없이 모두 200 성공")
    void testBackpressurePrevents429() {
        new TestKit(system) {{
            // 백프레셔 적용 - 초당 2건으로 throttle (버킷 leak rate와 동일)
            ActorRef caller = system.actorOf(
                    SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));

            int totalRequests = 15;
            for (int i = 0; i < totalRequests; i++) {
                caller.tell(new ApiRequest("safe-" + i, getRef()), getRef());
            }

            // 모든 응답 수집 (throttle로 인해 ~8초 소요)
            List<ApiResponse> responses = new ArrayList<>();
            for (int i = 0; i < totalRequests; i++) {
                ApiResponse response = expectMsgClass(Duration.ofSeconds(30), ApiResponse.class);
                responses.add(response);
            }

            // 모든 응답이 200이어야 함 (429 없음)
            long successCount = responses.stream()
                    .filter(r -> r.statusCode() == 200)
                    .count();

            System.out.println("[백프레셔] 총 " + totalRequests + "건 → 200: " + successCount + "건");
            for (ApiResponse r : responses) {
                System.out.println("  word=" + r.word() + ", result=" + r.result()
                        + ", bucket=" + r.bucketUsed() + "/" + r.bucketMax());
            }

            assertEquals(totalRequests, successCount,
                    "백프레셔 적용 시 모든 요청이 429 없이 성공해야 합니다");

            // 에코 검증
            for (ApiResponse r : responses) {
                assertTrue(r.result().startsWith("safe-"),
                        "에코 응답이어야 합니다: " + r.result());
            }
        }};
    }

    @Test
    @Order(5)
    @DisplayName("버킷 상태 헤더가 응답에 포함됨")
    void testBucketHeadersInResponse() {
        new TestKit(system) {{
            ActorRef caller = system.actorOf(
                    SafeApiCallerActor.props("http://localhost:" + SERVER_PORT, 2));

            caller.tell(new ApiRequest("header-test", getRef()), getRef());

            ApiResponse response = expectMsgClass(Duration.ofSeconds(5), ApiResponse.class);
            assertTrue(response.bucketMax() > 0, "버킷 최대값이 포함되어야 합니다");
            assertTrue(response.bucketUsed() > 0, "버킷 사용량이 포함되어야 합니다");
            assertEquals(BUCKET_CAPACITY, response.bucketMax(), "버킷 최대값이 서버 설정과 일치해야 합니다");
        }};
    }
}