You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Dispatcher는 ActorSystem 내에서 실행되는 모든 코드를 스케줄링 합니다.

각 Actor의 처리량과 시간 점유율을 조정하여 각자에게 공정한 리소스를 제공합니다.


Dispatcher란?

Thread를 개발코드내에 직접 생성하고, 각각의 Job의 효율적인 수행을 위해서 

스레드 모델을 익히고 직접 구현에 반영해야 했으며 병렬처리 고성능 관점에서

발전해왔지만, 스레드를 직접 제어해야하는 개발방식이 일괄적이지 못하고

분산처리에서 이러한 개발방법이 큰 도움이 되지 못하다는것이며 결정적으로

스레드 사용비용은 꾀비싼 비용을 지불해야하며 제어하기가 어렵다란점입니다.

그래서 각 작은 단위의 Task가 비동기처리모델로 Thread를 효율적으로 사용하는

방법들이 등장하였으며 다시 이것 스케쥴링을 관리하는 기능들이 외부 옵션으로

지원하기 시작하였습니다. 그러한 기능을 통상적으로 Dispatcher 이라고 불리며

AKKA에만 존재하는 컨셉이아니고 다양한 병렬처리혹은 동시성처리를 하는 어떠한

장치들이 최대한 개발자의 개입의 최소화하고 설정만을 통해 성능에관련한 튜닝옵션을

제공하기 시작하였습니다.


Dispatcher전략

  • 스레드풀을 분리해라 : 응답이 오래걸리는것과 아닌것을 분리함으로, 오래걸리는것을 별도로 스레드수를 확보할수가 있다.
  • 순차처리보다 병렬처리를 활용 : 한가지의 요청이 끝나기전 다른 요청이 진행될수 없는경우와 동시에 요청을한후 머징처리되는게 효율적인경우를 분리 
  • 최종전략 : 순차적으로 처리해야할것과 병렬적으로 처리해야할것이 썩일수 있으며 튜닝은 이것을 고려해야한다.

스레드 스케줄러에 따른 전략

  • thread-pool-executor : 각 Task의 완료시간이 비슷할시 유용하다, 빠른 Task가 모두 완료될시 놀고있는 스레드가 생길수가 있다.
  • fork-join-executor : 각 Task의 완료시간이 차이가 있을시 유용하다, 놀고 있는 스레드의 자원 가져다가 활용할수있다. 일반적으로 유용하다.

이쯤되면 AKKA는 자바가가진 스레드 스케쥴러를 정확하게 이해하고

이것을 추상화하여 액터 고성능 메시지 처리에서 스레드를 직접 작성할필요가 없으나

자바가가진 스레드 모델이 가진 스케쥴러를 이해할 필요가 있습니다.

어떠한 기능을 추상화한다란것은 전통적인 개발방식의 메카니즘을 잘 이해하고 있다란의미입니다.


Dispatcher Option

  • parallelism-min = 2 : 동시에 활성화되는 최소 스레드수 (available processors * factor)
  • parallelism-max = 8 : 동시에 활성화되는 최대 스레드수
  • parallelism-factor = 2.0 : Core개수대비 최대가 될수있는 스레드수
  • throughput = 100 : 동시에 처리하는 메시지수
  • fixed-pool-size = 32 : 스레드 개수 고정

Dispatcher 설정 샘플

akka {
	loggers = ["akka.event.Logging$DefaultLogger"]
	loglevel = "INFO"
	stdout-loglevel = "ERROR"	
}

blocking-io-dispatcher {
	type = Dispatcher
	executor = "thread-pool-executor"
	thread-pool-executor {
	fixed-pool-size = 2
	}
	throughput = 1
}

my-thread-pool-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "thread-pool-executor"
  # Configuration for the thread pool
  thread-pool-executor {
    # minimum number of threads to cap factor-based core number to
    core-pool-size-min = 2
    # No of core threads ... ceil(available processors * factor)
    core-pool-size-factor = 2.0
    # maximum number of threads to cap factor-based number to
    core-pool-size-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}
	
my-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}


동시처리 스레드 제한 예제

//application.conf 설정
blocking-io-dispatcher {
 type = Dispatcher
 executor = "thread-pool-executor"
 thread-pool-executor {
 fixed-pool-size = 2
 }
 throughput = 1
}


//구현부
//특정 액터 Dispatcher지정
ActorRef testActor = system.actorOf(ext.props("testActor")
 .withDispatcher("blocking-io-dispatcher"),
 "service1");


// 동시에 메시지 전송
for(int i=0;i<10;i++)
 testActor.tell("test message wiht dispatcher",null);

[INFO] [05/06/2018 23:27:21.795] [AkkaTestApp-blocking-io-dispatcher-8] [TestActor] Incommessage test message wiht dispatcher
[INFO] [05/06/2018 23:27:21.798] [AkkaTestApp-blocking-io-dispatcher-9] [TestActor] Incommessage test message wiht dispatcher
[INFO] [05/06/2018 23:27:21.799] [AkkaTestApp-blocking-io-dispatcher-8] [TestActor] Incommessage test message wiht dispatcher
[INFO] [05/06/2018 23:27:21.800] [AkkaTestApp-blocking-io-dispatcher-8] [TestActor] Incommessage test message wiht dispatcher
[INFO] [05/06/2018 23:27:21.801] [AkkaTestApp-blocking-io-dispatcher-8] [TestActor] Incommessage test message wiht dispatcher
[INFO] [05/06/2018 23:27:21.801] [AkkaTestApp-blocking-io-dispatcher-8] [TestActor] Incommessage test message wiht dispatcher
[INFO] [05/06/2018 23:27:21.802] [AkkaTestApp-blocking-io-dispatcher-8] [TestActor] Incommessage test message wiht dispatcher
[INFO] [05/06/2018 23:27:21.807] [AkkaTestApp-blocking-io-dispatcher-8] [TestActor] Incommessage test message wiht dispatcher
[INFO] [05/06/2018 23:27:21.807] [AkkaTestApp-blocking-io-dispatcher-8] [TestActor] Incommessage test message wiht dispatcher
[INFO] [05/06/2018 23:27:21.807] [AkkaTestApp-blocking-io-dispatcher-8] [TestActor] Incommessage test message wiht dispatcher

동시처리수를 1개로 제한하고, 스레드수를 2개로 제한을하면 스레드모델중 세마포어와 유사한 스레드풀 기능을 수행하게 되며

일반적으로 심플하게 사용되는 방식입니다.동시 요청이 10개가 왔다고하면, 스레드 두개만 활용하여 분배되어 처리되게 됩니다.


자바가가진 스레드풀의 개념과, 이를 더 추상화해주는 dispatcher에 대한 더 자세한것은

아래 URL을 통해 한번더 학습하는것을 권장합니다.





  



  • No labels