분산 어플리케이션 내에서 고성능 메시지전송을 위해 이러한 라우팅 전략이 필요하며

어플리케이션에 내재하여 손쉽게 사용이 가능합니다.


라우팅전략

성능관련 스케일 아웃은 라우팅 전략과 관련이 있고

운영중 장비를 Down없이 스케일 늘리고 줄일수 있는 전략은 클러스터링과 연관이 있다고 볼수 잇습니다.

기술적으로 AKKA내에서 클러스터는 목표는 병목 현상이 없는 탄력적인 분산형 피어 투 피어 네트워크라고 정의 내릴수 있습니다.

 AKKA에서는 다양한 라우팅 전략을  디플로이 환경설정 전략으로 최소의 코드변경만으로 적용가능합니다.

또한 이러한 전략을 적용하기 위해 운영중 장비를 Down없이  다이나믹하게 스케일아웃할수있는 

클러스터로 개념으로 확장 가능합니다. AKKA의 라우팅전략은 코드설계가아닌, 설정화로 이루어내니 

지원가능한 대표적인 라우팅을 설명합니다. ( 라이브러리 내에서는 지속적으로 새로운 라우팅 모델이 업데이트중에 있습니다.)

  • AKKA가 아니여도 이러한 라우팅 분산처리는 IT에서 이미 활용하고 있는 컨셉입니다.

RoundRobin



Round Robin Router

단순하게 들어온 메시지 순서대로, 순차적으로 대상 노드를 바꿔가며 전송시 사용

ex>단순한 RestAPI의 성능향상및 특정 노드 장애에 대응하는 다중화구성 ( 기본적으로 Pool에 등록된 Node Crash발생시 해당노드는 해당풀에서 자동제거됨)



Broadcast



Broadcast Router


어떠한 정보의 변경을 모든 노드가 알아야할시, 주로 전체 동기화및 전체 푸시용도

ConsistentHashing



ConsistentHash Router

특정 처리에 대해 해시값기반 베이스로 노드의 변경의 가능성을 최소화할때

ex>

  • 웹소켓 기능 보장 ( 핸드쉐이크과정에서 노드변경이되면 오동작)
  • 자체캐쉬 작동 보장(성능을 위해 노드 자체에 서버 캐시처리를 하였지만, 노드변경시 서버캐시 적용못받음)
  • 최초 설계에의한 제한적 분산(기능적으로 생성한 오브젝트가 예상되는 특정 노드에 있어야하는 경우등)
  • SSL및 서비스 로그인 세션유지 ( 노드변경시 재인증을 받아야하는 성능이슈 )
  • X에 해당하는 고객(또는 몰)의 외부적인 요소에의해 API 호출제약이 n이라고 가정하면 단순하게 분산처리가 되어버리면 TPS제어를 할수 없습니다. 클러스터내에 하우터가 조장되면 분산이 재조정되기 때문에 특정 컨디션의 데이터를 특정노드에 실행이 보장되면 단일노드 환경에서 원격캐싱이아닌 수백배 빠른 로컬캐시를 활용할수도 있으며 호출 제약또한 리모트가 아닌 인메모리 기능으로 심플하게 구현할수 있습니다.
    • Redis에대한 오해 : Redis가 Key/Value여서 RDB를 사용하는것보다 빠른 캐시개념으로 접근할수 있지만... 원격 접근이라는 네트워크 전달 비용이 있습니다. 인 메모리전략은 수백배/수천배 빠를수 있습니다. 로컬캐싱을 이용할수 있습니다.


ScatterGatherFirstCompleted



ScatterGatherFirstCompleted Router

성능을 위해 다중노드로 구성하였으며, 가장 빠르게 처리한 녀석의 결과를 사용할시

ex> 1개의 빠른인스턴스 검색시 인덱스된 데이터가 불규적으로 분산되거나 중복이있을시 , 가장 빠른 결과물을 사용할시


SmallestMailbox



SmallestMailbox Router

동일한 작업을 수행해도 완료시간을 동일하게 보장할수 없음으로

덜바쁜 노드에게 우선으로 일을 주는방식


TailChopping



기본적으로 랜덤 라우터이나, 최적 응답시간내에 반응못하면(5s)

마지막까지 처리는 하지만~ 특정노드를 잠시쉬게 하는 전략 
> GC를 제대로 할 시간을 주지 못할때 해당 노드는 GC를 계속 시도하느라 CPU를 점유해서 뻗을수 있습니다. 이때 빈도가 특정노드중 일부이며 잠쉬 쉬면 정상화가 될수도 있다란 측정이 가능 했을때 활용가능합니다.

추가정보


https://github.com/reactive-streams

여러 업체및 진영( Java/C#/SCALA/JS등 )에서 네트워크상의 비동기처리를 비롯하여 이와 관련한 문제에대해

공통적으로 고민하기 시작하였으며, 표준 인터페이스를 만드는 reactive-streams 활동으로 이어집니다.

akka를 비롯하여 위에서 언급한 stream처리가 필요한 플래폼들은 이 활동에 영향을 주거나/받았으며,

이러한 인터페이스에 대한 표준을 따르고 있습니다.

이것은 어떠한 구현체가 아니라, 구현을 위한 약속된 스펙입니다.

서로 다른 스트리밍구현이 상호운영이될수 있도록하는것이 이 프로젝트의 기대치입니다.







심플 라우팅 워커 설계

라우팅 전략은 다양한 유입처리를 어떻게 분배하여 성능적 또는 안정적으로 유실없이 모두 처리할것인가?

에 대한 해결책으로 ,  AKKA에서는 서비스코드 내에 직접 설계가 가능하도록 몇가지 툴을 제공해줍니다.

물론 이러한 목표를 달성하기위해 KAFKA와 같은 외부 MQ를 잘 사용하는것도 방법중에 하나일것입니다.

라우팅은 주로 분배및 작동방식을 의미를 하고, 라우티는 실제 Task를 처리할수있는 종단점에 위치한 액터입니다.

라우팅 방법 설정

application.conf
akka { 
 actor.deployment {
 /workers/router1 {
 router = round-robin-pool
 nr-of-instances = 5
 }
 } 
}

random-pool,balancing-pool 등 원하는 분배방식을 설정을 통해 변경 할수가 있으며

라우팅기능이 추상화되어 사용가능함으로, 코드내에서 간단하게 활용가능합니다.

일반적으로 스레드풀관리를 직접 구현하여 라우팅기능을 구현하는것보다는 훨씬 쉬우며

외부 MQ시스템을 활용하는것보다는 번거러울수 있으나, 이러한 기능들을 직접 설계에반영하여

쉽게 내재화할수 있다란것은 장점중하나이며, 직접구현을 시도할시

클러스터를 포함하여 라우팅기능에대한 이해가 높아지기때문에 외부시스템과 연동하거나 이용할때도 도움이됩니다. 



라우팅기능을 포함한, 작업을 분배시키는 Wokers의 구현과 사용이 얼마나 심플해지는지 살펴보겠습니다.


Works구현

@Component
@Scope("prototype")
public class Workers extends AbstractActor {
    private final LoggingAdapter log = Logging
            .getLogger(getContext().system(), "Workers");
    
    @Autowired
    private ApplicationContext context;    
    
    private ActorRef roundrobinpool = null;
        
    @Override public void preStart() {
		log.info("Workers::preStart");
		SpringExtension ext = context.getBean(SpringExtension.class);		


        //application.conf에 설정된 라우터방식으로 작동방식결정됨
		roundrobinpool = getContext().actorOf( FromConfig.getInstance().props( ext.props("testActor") ) , "router1");
	}
	
	@Override
	public Receive createReceive() {
		return receiveBuilder()
		.matchEquals("hi1",s->{  //hi1 메시지를 받으면 라운드로빈처리를함
			log.info("Workers::Received String message: {}", s);
			roundrobinpool.tell(s, getSender() );	// 입력된 잡(hi1)을 라운드 로빈라우터에게 메시지전송				
		})
		.matchAny(o -> log.info("received unknown message"))
		.build();
	}
}


테스트코드

	protected void routerTest(ActorSystem system,SpringExtension ext) {
	    new TestKit(system) {{
	    	final ActorRef workers = system.actorOf( ext.props("workers"),"workers");  // workers 액터생성	    		    
	    	workers.tell( "hi1", getRef() );
		      // await the correct response
		    expectMsg(java.time.Duration.ofSeconds(1), "hi too"); //라우티가 응답한 메시지를 검사
		    
	    	workers.tell( "hi1", getRef() );
		      // await the correct response
		    expectMsg(java.time.Duration.ofSeconds(1), "hi too");


		    workers.tell( "hi1", getRef() );
		    workers.tell( "hi1", getRef() );
		    workers.tell( "hi1", getRef() );
		    workers.tell( "hi1", getRef() );
		    workers.tell( "hi1", getRef() );
	    }};
	}



[INFO] [05/18/2018 13:26:18.720] [AkkaTestApp-akka.actor.default-dispatcher-4] [Workers] Workers::Received String message: hi1

[INFO] [05/18/2018 13:26:18.726] [AkkaTestApp-akka.actor.default-dispatcher-2] [TestActor] akka://AkkaTestApp/user/workers/router1/$a::Incommessage hi1
[INFO] [05/18/2018 13:26:18.732] [AkkaTestApp-akka.actor.default-dispatcher-2] [Workers] Workers::Received String message: hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-5] [TestActor] akka://AkkaTestApp/user/workers/router1/$b::Incommessage hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-5] [Workers] Workers::Received String message: hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-5] [Workers] Workers::Received String message: hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-5] [Workers] Workers::Received String message: hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-3] [TestActor] akka://AkkaTestApp/user/workers/router1/$c::Incommessage hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-5] [Workers] Workers::Received String message: hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-3] [TestActor] akka://AkkaTestApp/user/workers/router1/$d::Incommessage hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-5] [Workers] Workers::Received String message: hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-4] [TestActor] akka://AkkaTestApp/user/workers/router1/$e::Incommessage hi1
[INFO] [05/18/2018 13:26:18.733] [AkkaTestApp-akka.actor.default-dispatcher-9] [TestActor] akka://AkkaTestApp/user/workers/router1/$b::Incommessage hi1
[INFO] [05/18/2018 13:26:18.734] [AkkaTestApp-akka.actor.default-dispatcher-3] [TestActor] akka://AkkaTestApp/user/workers/router1/$a::Incommessage hi1


라우팅 기능 추가및 변경

위 작성된 코드에서, 랜던 라우트를 추가해보고 작동방식을 확인해보겠습니다.

얼마나 최소의 코드로 기존 가진기능을 유지하면서 기능추가가되는지 확인해보겠습니다.

//router2 설정
		/workers/router2 {
			router = random-pool
			nr-of-instances = 5
		}
// 랜덤풀 생성
randomPool = getContext().actorOf( FromConfig.getInstance().props( ext.props("testActor") ) , "router2");


//hi1을 받으면 라운드로빈, hi2를 받으면 랜덤풀로 작동된다.
		.matchEquals("hi1",s->{
			log.info("Workers::Received String message: {}", s);
			roundrobinpool.tell(s, getSender() );					
		})
		.matchEquals("hi2",s->{
			log.info("Workers::Received String message: {}", s);
			randomPool.tell(s, getSender() );					
		})

[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-6] [TestActor] akka://AkkaTestApp/user/workers/router2/$e::Incommessage hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-4] [Workers] Workers::Received String message: hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-4] [Workers] Workers::Received String message: hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-12] [TestActor] akka://AkkaTestApp/user/workers/router2/$a::Incommessage hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-6] [TestActor] akka://AkkaTestApp/user/workers/router2/$e::Incommessage hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-4] [Workers] Workers::Received String message: hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-8] [TestActor] akka://AkkaTestApp/user/workers/router2/$d::Incommessage hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-4] [Workers] Workers::Received String message: hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-6] [TestActor] akka://AkkaTestApp/user/workers/router2/$b::Incommessage hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-4] [Workers] Workers::Received String message: hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-4] [Workers] Workers::Received String message: hi2
[INFO] [05/18/2018 13:50:08.645] [AkkaTestApp-akka.actor.default-dispatcher-6] [TestActor] akka://AkkaTestApp/user/workers/router2/$c::Incommessage hi2


고급적 라우팅기능 부여하기

우리가 지정한 옵션에따라 라우티가 동적으로 확장이되고 축소되는 기능을 부여할수가 있으며

성능문제를 해결하기위한 장치중 하나로 활용할수가 있습니다.


동적 라우티조절

akka.actor.deployment {
  /parent/router29 {
    router = round-robin-pool
    resizer {
      lower-bound = 2
      upper-bound = 15
      messages-per-resize = 100
    }
  }
}

위와같이 유입에따른 메시지처리능력에 따라 (보통 사용자가 늘어나는 구간을 램프업이라고 합니다.) 

라우티를 동적으로 늘리고 줄일수있으며, AKKA에서 몇가지 유용한 유틸을 제공합니다. 


커스텀한 라우터 설계

public class RedundancyRoutingLogic implements RoutingLogic {
  private final int nbrCopies;
  
  public RedundancyRoutingLogic(int nbrCopies) {
    this.nbrCopies = nbrCopies;  
  }
  RoundRobinRoutingLogic roundRobin = new RoundRobinRoutingLogic();
  
  @Override
  public Routee select(Object message, IndexedSeq<Routee> routees) {
    List<Routee> targets = new ArrayList<Routee>();
    for (int i = 0; i < nbrCopies; i++) {
      targets.add(roundRobin.select(message, routees));
    }
    return new SeveralRoutees(targets);
  }
}

AKKA에서 대부분 우리가 알고있고 쓸만한 라우터를 제공하지만, 제공되지 않는 특수한 라우터 방식을

사용할시 위와같이 라우터 설계가 가능합니다.


Dispatcher설정

akka.actor.deployment {
  /poolWithDispatcher {
    router = random-pool
    nr-of-instances = 5
    pool-dispatcher {
      fork-join-executor.parallelism-min = 5
      fork-join-executor.parallelism-max = 5
    }
  }
}

라우터에 병렬처리에 관련된 능력을 지정하고자할때 설정될수 있습니다.

최대 5개의 스레드를 동시에 사용하겠다고 하는 설정사항입니다.

성능을 고려하여 이값을 잘 지정하는것은 생각보다 어려운주제이지만, 설정은 쉽습니다.


참고링크: 여기 샘플코드는,아래문서를 참고하여 Spring Boot에 호환이되도록 재구성되고 테스트 되었습니다.





Write a comment…