응답을 기다려야하는 블락코드가 1초라고 가정해보자~  이것을 순차 처리했을때 60개를 처리하는데 1분이라는 시간이 걸리게 된다. 

블락없는 비동기 동시성 처리코드를 모두 준수하여 작성하면 위 문제를 다른 패턴으로 풀수 있지만


여기서는 블락킹 이 포함된 코드를, 병렬처리하고 TPS제어기를 분리하여 TPS이내에만 작동하도록 실행하는 코드를 구현해보자


TPS를 제어하기위해 그 값을 계산하고 계산한값만큼 추가적으로 블락을 시키는 방법은

단순해보이지만 이러한 시나리오는 대부분 실패를 하게됩니다.



구현부

사용코드는 스레드풀을 지정하여 병렬 처리 최대 능력치를 지정합니다.  일반적으로 블락킹 시간이 1초일때 10TPS가 필요하면 10스레드가 필요합니다.

스레드는 공짜가 아니니~ 스레드 1개당 할당되는 메모리를 확인하여 충분한 값을 사용합니다.

    private static final Executor executor = Executors.newFixedThreadPool(450);

    private static CompletionStage<String> callApiAsync(Integer param) {
        // CompletableFuture를 사용하여 비동기 처리 구현
        return CompletableFuture.supplyAsync(() -> {
            try {

                double dValue = Math.random();
                int iValue = (int)(dValue * 1000);
                Thread.sleep(iValue); // API 응답 시간을 시뮬레이션하기 위한 지연
                tpsActor.tell("CompletedEvent", ActorRef.noSender()); // TPS측정을 위
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Response for " + param;
        }, executor);
    }


이용부

TPS를 제어하고 동시처리수 만큼 명령이 발생하는 것은 비동기적 처리인 AkkaStream를 이용해 간단하게 구현할수 있습니다.

최종 실행지점을 sink로 연결하면 블락킹모드가 포함된 구현부를 이 시나리오대로 호출이 됩니다.

    public void ThrottleTest(int tps, int testCount ) {
        new TestKit(actorSystem) {
            {
                final ActorMaterializerSettings settings = ActorMaterializerSettings.create(actorSystem)
                        .withDispatcher("my-dispatcher-streamtest");

                final Materializer materializer = ActorMaterializer.create(settings, actorSystem);
                final TestKit probe = new TestKit(actorSystem);

                tpsActor = actorSystem.actorOf(TpsMeasurementActor.Props(), "TpsActor");
                tpsActor.tell(probe.getRef(), getRef());
                expectMsg(Duration.ofSeconds(1), "done");

                // Source 생성
                Source<Integer, NotUsed> source = Source.range(1, testCount);

                // 병렬 처리를 위한 Flow 정의
                final int parallelism = 450;
                Flow<Integer, String, NotUsed> parallelFlow = Flow.<Integer>create()
                        .mapAsync(parallelism, BackPressureTest::callApiAsync);

                // Buffer 설정 및 OverflowStrategy.backpressure 적용
                int bufferSize = 100000;
                Flow<Integer, Integer, NotUsed> backpressureFlow = Flow.<Integer>create()
                        .buffer(bufferSize, OverflowStrategy.backpressure());

                AtomicInteger processedCount = new AtomicInteger();

                // Sink 정의
                Sink<String, CompletionStage<Done>> sink = Sink.foreach(s -> {
                    //처리완료
                    processedCount.getAndIncrement();
                    if(processedCount.getAcquire() % 10 == 0) {
                        //System.out.println("Processed 10");
                    }
                });

                System.out.println("Run backpressureFlow bufferSize:"+bufferSize);

                // RunnableGraph 생성 및 실행
                source.via(backpressureFlow)
                        .throttle(tps, FiniteDuration.create(1, TimeUnit.SECONDS), tps, (ThrottleMode) ThrottleMode.shaping())
                        .via(parallelFlow)
                        .to(sink)
                        .run(materializer);

                within(
                        Duration.ofSeconds(15),
                        () -> {
                            // Will wait for the rest of the 10 seconds
                            expectNoMessage(Duration.ofSeconds(10));
                            return null;
                        });
            }
        };
    }
my-dispatcher-streamtest {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 50
  }
  throughput = 1
}



테스트 결과

블락킹이 포함된 코드를  TPS200 단위로~ 스레드를 활용 병렬처리 진행될수 있습니다.

  • 상위 스트림의 API TPS제약이 존재하며 병렬처리가 필요할시 심플하게 이용될수 있습니다.

전체 처리코드 : https://github.com/psmon/java-labs/blob/master/springweb/src/test/java/com/webnori/springweb/akka/stream/BackPressureTest.java#L234


Next : 빠른생산자와 느린 소비자 - API 호출편







  • No labels
Write a comment…