리얼타임에 발생하는 스트리밍 이벤트처리에대한 생산량을 준실시간성으로 처리하여
짧은 주기의 벌크처리를 지속적으로 하는 전략입니다.
@Test @DisplayName("Actor - RoundRobinThrottleTest Test") public void RoundRobinThrottleTest(){ // 동시성 병렬처리 문제 // 스레드 3개 작동된다고 TPS3으로 작동되는것이 아니며 각 스레드는 대기가 있을수 있기때문입니다. // 생산명령을 3으로 조절하고, 처리 스레드는 N개가 있어야 3TPS처리에 가깝게 처리할수 있습니다. new TestKit(system) { { //Given int concurrencyCount = 10; //동시처리능력 int processCouuntPerSec = 3; //초당 처리 밸브 int maxBufferSize = 3000; //최대버퍼수(넘을시 Drop전략) int testCallCount = 100; final Materializer materializer = ActorMaterializer.create(system); List<String> paths = new ArrayList<>(); for(int i=0; i<concurrencyCount ; i++){ String pathName = "w" + (i+1); ActorRef work = system.actorOf(GreetingActor.Props("my-dispatcher-test1"),pathName); work.tell(new FakeSlowMode(), ActorRef.noSender()); paths.add("/user/" + pathName); } ActorRef router = system.actorOf(new RoundRobinGroup(paths).props(), "router"); final ActorRef throttler = Source.actorRef(maxBufferSize, OverflowStrategy.dropNew()) .throttle(processCouuntPerSec, FiniteDuration.create(1, TimeUnit.SECONDS), processCouuntPerSec, (ThrottleMode) ThrottleMode.shaping()) .to(Sink.actorRef(router, akka.NotUsed.getInstance())) .run(materializer); for(int i=0 ; i<testCallCount ; i++){ throttler.tell("#### Hello World!" + i ,ActorRef.noSender()); } int expectedCompleteTime = testCallCount / 3; expectNoMessage(Duration.ofSeconds(expectedCompleteTime)); } }; }