Akka의 Dispatcher는 메시지처리 성능과 관련한 튜닝 옵션이며 스레드를 직접 작성하여 Task를 다루는것보다

정책별로 디테일한 성능 튜닝이 가능합니다.  샘플은 JAVA이나, C#에서도 동일한 컨셉으로 사용가능합니다.


다음과 같은 심플한 성능 전략을 세웠다고 해봅시다.

빠른그룹 : 메시지 적재 100까지 제한을   두고 동시처리를 최대 10개까지만 사용하겠다.

느린그룹 : 동시처리 1개까지만 제한을 두겠다.

여기서 적재제한은, 메모리는 무한정하지 않기때문에 처리대기중인 메시지를 최대한 보유하고 있을수를 제한을 두는것이다.

예를 들어 100개의 제한을 두면 , 이미 메시지를 처리못하는 상태인 100개의 메시지가 차게되면(큐풀상태) 101번째 메시지는 드랍하는것이다.

간략하게 구현예제를 살펴봅시다.


튜닝의 설정화

fast-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
  parallelism-min = 5
  parallelism-factor = 2.0
  parallelism-max = 10
}
throughput = 100
}

slow-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 1
    parallelism-factor = 2.0
    parallelism-max = 1
  }
  throughput = 10
}

스레드풀의 성능전략을 외부 설정을 통해서 하였으며, 구현 코드에서는 이와관련한 이름을 사용하면 됩니다.


실제 적용 코드

public class SmartActor extends AbstractActor {
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Integer.class, number -> {    //단순하게 숫자의 입력을 받고 로그를 처리하는 액터이다.
                    Thread.sleep(500);  // delay for test
                    log.info(String.format("===== process msg:%s", name));
                })
                .matchAny(o -> log.info("received unknown message"))
                .build();
    }
}


@Test
public void testIt() throws InterruptedException {
    new TestKit(system) {{
        ActorRef probe = getRef();
        //6가지의 액터(태스크)를 두그룹으로 나누에 비동기 동시처리 진행
        ActorRef fastActor1 = system.actorOf(SmartActor.props("i'am fast").withDispatcher("fast-dispatcher"), "fast1");
        ActorRef fastActor2 = system.actorOf(SmartActor.props("i'am fast").withDispatcher("fast-dispatcher"), "fast2");
        ActorRef fastActor3 = system.actorOf(SmartActor.props("i'am fast").withDispatcher("fast-dispatcher"), "fast3");

        ActorRef slowActor1 = system.actorOf(SmartActor.props("i'am slow").withDispatcher("slow-dispatcher"), "slow1");
        ActorRef slowActor2 = system.actorOf(SmartActor.props("i'am slow").withDispatcher("slow-dispatcher"), "slow2");
        ActorRef slowActor3 = system.actorOf(SmartActor.props("i'am slow").withDispatcher("slow-dispatcher"), "slow3");

        for (int i = 0; i < 1000; i++) {
            fastActor1.tell(i, ActorRef.noSender());
            fastActor2.tell(i, ActorRef.noSender());
            fastActor3.tell(i, ActorRef.noSender());

            slowActor1.tell(i, ActorRef.noSender());
            slowActor2.tell(i, ActorRef.noSender());
            slowActor3.tell(i, ActorRef.noSender());
        }
    }};

실행결과:

[INFO] [04/02/2019 17:47:03.455] [TestSystem-fast-dispatcher-18] [akka://TestSystem/user/fast3] ===== process msg:i'am fast
[INFO] [04/02/2019 17:47:03.455] [TestSystem-fast-dispatcher-17] [akka://TestSystem/user/fast2] ===== process msg:i'am fast
[INFO] [04/02/2019 17:47:03.455] [TestSystem-fast-dispatcher-16] [akka://TestSystem/user/fast1] ===== process msg:i'am fast
[INFO] [04/02/2019 17:47:03.456] [TestSystem-slow-dispatcher-21] [akka://TestSystem/user/slow3] ===== process msg:i'am slow
[INFO] [04/02/2019 17:47:03.955] [TestSystem-fast-dispatcher-18] [akka://TestSystem/user/fast3] ===== process msg:i'am fast
[INFO] [04/02/2019 17:47:03.955] [TestSystem-fast-dispatcher-16] [akka://TestSystem/user/fast1] ===== process msg:i'am fast
[INFO] [04/02/2019 17:47:03.955] [TestSystem-fast-dispatcher-17] [akka://TestSystem/user/fast2] ===== process msg:i'am fast
[INFO] [04/02/2019 17:47:03.956] [TestSystem-slow-dispatcher-21] [akka://TestSystem/user/slow1] ===== process msg:i'am slow


10초구간의 로그만 추출하였으며, slow그룹은 1가지 스레드만 사용하여 1초동안 2개가 처리되었으며
fast 그룹은 여러개의 스레드를 사용하여 5개가 처리되었다. 


Dispatcher을 이용하는것은 스레드를 직접 생성하는것보다 유연한 스레드풀 사용 전략을 세울수가 있습니다.


참고소스 : 



  • No labels