응답을 기다려야하는 블락코드가 1초라고 가정해보자~ 이것을 순차 처리했을때 60개를 처리하는데 1분이라는 시간이 걸리게 된다.
블락없는 비동기 동시성 처리코드를 모두 준수하여 작성하면 위 문제를 다른 패턴으로 풀수 있지만
여기서는 블락킹 이 포함된 코드를, 병렬처리하고 TPS제어기를 분리하여 TPS이내에만 작동하도록 실행하는 코드를 구현해보자
TPS를 제어하기위해 그 값을 계산하고 계산한값만큼 추가적으로 블락을 시키는 방법은 단순해보이지만 이러한 시나리오는 대부분 실패를 하게됩니다. |
사용코드는 스레드풀을 지정하여 병렬 처리 최대 능력치를 지정합니다. 일반적으로 블락킹 시간이 1초일때 10TPS가 필요하면 10스레드가 필요합니다.
스레드는 공짜가 아니니~ 스레드 1개당 할당되는 메모리를 확인하여 충분한 값을 사용합니다.
private static final Executor executor = Executors.newFixedThreadPool(450); private static CompletionStage<String> callApiAsync(Integer param) { // CompletableFuture를 사용하여 비동기 처리 구현 return CompletableFuture.supplyAsync(() -> { try { double dValue = Math.random(); int iValue = (int)(dValue * 1000); Thread.sleep(iValue); // API 응답 시간을 시뮬레이션하기 위한 지연 tpsActor.tell("CompletedEvent", ActorRef.noSender()); // TPS측정을 위 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return "Response for " + param; }, executor); } |
TPS를 제어하고 동시처리수 만큼 명령이 발생하는 것은 비동기적 처리인 AkkaStream를 이용해 간단하게 구현할수 있습니다.
최종 실행지점을 sink로 연결하면 블락킹모드가 포함된 구현부를 이 시나리오대로 호출이 됩니다.
public void ThrottleTest(int tps, int testCount ) { new TestKit(actorSystem) { { final ActorMaterializerSettings settings = ActorMaterializerSettings.create(actorSystem) .withDispatcher("my-dispatcher-streamtest"); final Materializer materializer = ActorMaterializer.create(settings, actorSystem); final TestKit probe = new TestKit(actorSystem); tpsActor = actorSystem.actorOf(TpsMeasurementActor.Props(), "TpsActor"); tpsActor.tell(probe.getRef(), getRef()); expectMsg(Duration.ofSeconds(1), "done"); // Source 생성 Source<Integer, NotUsed> source = Source.range(1, testCount); // 병렬 처리를 위한 Flow 정의 final int parallelism = 450; Flow<Integer, String, NotUsed> parallelFlow = Flow.<Integer>create() .mapAsync(parallelism, BackPressureTest::callApiAsync); // Buffer 설정 및 OverflowStrategy.backpressure 적용 int bufferSize = 100000; Flow<Integer, Integer, NotUsed> backpressureFlow = Flow.<Integer>create() .buffer(bufferSize, OverflowStrategy.backpressure()); AtomicInteger processedCount = new AtomicInteger(); // Sink 정의 Sink<String, CompletionStage<Done>> sink = Sink.foreach(s -> { //처리완료 processedCount.getAndIncrement(); if(processedCount.getAcquire() % 10 == 0) { //System.out.println("Processed 10"); } }); System.out.println("Run backpressureFlow bufferSize:"+bufferSize); // RunnableGraph 생성 및 실행 source.via(backpressureFlow) .throttle(tps, FiniteDuration.create(1, TimeUnit.SECONDS), tps, (ThrottleMode) ThrottleMode.shaping()) .via(parallelFlow) .to(sink) .run(materializer); within( Duration.ofSeconds(15), () -> { // Will wait for the rest of the 10 seconds expectNoMessage(Duration.ofSeconds(10)); return null; }); } }; } |
my-dispatcher-streamtest { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { fixed-pool-size = 50 } throughput = 1 } |
블락킹이 포함된 코드를 TPS200 단위로~ 스레드를 활용 병렬처리 진행될수 있습니다.
Next : 빠른생산자와 느린 소비자 - API 호출편