클러스터를 이해하기위해 실제 실습 코드를 통해 클러스터를 작동시켜보겠습니다. 클러스터 라우터는 이전에 설명한 Actor-라우터와 동일함으로 라우터별로 별도의 부가 설명을 생략하겠습니다. |
다음과 같은 클러스터를 설계한다고 합시다.
기존에 설계된 액터와 약간다른점은, 클러스터에 필요한 잡담(클러스터 멤버의 변경사항을 스스로 감지하는)
기능을 추가 할것입니다. 잡담기능을 추가한것외에는 기존의 액터메시지 처리와 동일합니다.
public class ClusterActor : UntypedActor
{
protected Akka.Cluster.Cluster Cluster = Akka.Cluster.Cluster.Get(Context.System);
protected ILoggingAdapter Log = Context.GetLogger();
protected override void PreStart()
{
// subscribe to IMemberEvent and UnreachableMember events
Cluster.Subscribe(Self, ClusterEvent.InitialStateAsEvents,
new[] { typeof(ClusterEvent.IMemberEvent), typeof(ClusterEvent.UnreachableMember) });
}
/// <summary>
/// Re-subscribe on restart
/// </summary>
protected override void PostStop()
{
//정상적으로 Actor를 정지하면 멤버에서 탈퇴가됩니다.
Cluster.Unsubscribe(Self);
}
protected void OnReceiveData(object message)
{
Log.Info("Data:" + message.ToString());
}
protected override void OnReceive(object message)
{
var up = message as ClusterEvent.MemberUp;
if (up != null)
{
var mem = up;
Log.Info("Member is Up: {0}", mem.Member);
}
else if (message is ClusterEvent.UnreachableMember)
{
var unreachable = (ClusterEvent.UnreachableMember)message;
Log.Info("Member detected as unreachable: {0}", unreachable.Member);
//멤버와 갑자기 연락 두절이 되었기때문에, 멈버를 제거합니다.
Cluster.Down(unreachable.Member.Address);
}
else if (message is ClusterEvent.MemberRemoved)
{
var removed = (ClusterEvent.MemberRemoved)message;
Log.Info("Member is Removed: {0}", removed.Member);
}
else if (message is ClusterEvent.IMemberEvent)
{
//IGNORE
}
else if (message is Hello)
{
OnReceiveData(message); //실제 처리할 DataProcess
}
else
{
Unhandled(message);
}
}
} |
시드노드는 앞서 설명했듯이, 마스터 노드에 해당하는것으로 1개가 필요합니다. 다중기기에 서비스를 뛰울시 설정사항이나
구동방식의 차이가 없지만, 학습용 테스트이기때문에 시드노드는 9999 포트로 고정을하고 시드노드에 붙게되는 일반노드는
0포트(동적) 로 스케일아웃 테스트를 하겠습니다. 일반적으로 노드 확장은 동일 포트로 확장을 하는것이니 이 차이점을 인지하고
코드 파악을 하시면 되겠습니다.
public void ClusterUpSeedNode()
{
var baseConfig = ConfigurationFactory.ParseString(@"akka {
actor{
provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
deployment {
/myClusterPoolRouter {
routees.paths = [""/user/myClusterPoolRouter""]
router = round-robin-pool # routing strategy
nr-of-instances = 10 # max number of total routees
cluster {
enabled = on
allow-local-routees = on
use-role = crawler
max-nr-of-instances-per-node = 1
}
}
}
}
remote {
helios.tcp {
port = 9999 #테스트를 위해 시드노드의 포트를 9999로 고정하였습니다.
hostname = 127.0.0.1
}
}
cluster {
seed-nodes = [""akka.tcp://ClusterSystem@127.0.0.1:9999""] # 시드노드의 주소는 알필요가 있으며,자신의 주소와 동일할시 시드+롤 두가지 역할을 합니다.
roles = [""crawler""] # 자신의 역활의 이름에대해 정의합니다.
#role.[""crawler""].min-nr-of-members = 3 # crawler role minimum node count
}
}");
actorSystem = ActorSystem.Create("ClusterSystem", baseConfig);
//var clusterActor2 = actorSystem.ActorOf<EchoActor2>("myClusterPoolRouter");
var router = actorSystem.ActorOf(Props.Create<ClusterActor>().WithRouter(FromConfig.Instance), "myClusterPoolRouter");
Task.Delay(2000).Wait(); //테스트를 위해 멤버쉽의 처리가 완료되고 난 이후 메시지 처리를 진행합니다.
var msg = new Hello("hi");
router.Tell(msg);
router.Tell(msg);
router.Tell(msg);
} |
public void ClusterUpNode()
{
var baseConfig = ConfigurationFactory.ParseString(@"akka {
actor{
provider = ""Akka.Cluster.ClusterActorRefProvider, Akka.Cluster""
deployment {
/myClusterPoolRouter {
routees.paths = [""/user/myClusterPoolRouter""]
router = round-robin-pool # routing strategy
nr-of-instances = 10 # max number of total routees
cluster {
enabled = on
allow-local-routees = on
use-role = crawler
max-nr-of-instances-per-node = 1 #노드 하나가 가질수 있는 최고 인스턴스수입니다. 일반적으로 1을 사용합니다.
}
}
}
}
remote {
helios.tcp {
port = 0 #테스트를 위해, 일반노드의 포트는 동적포트로 지정하였씁니다. 한장비에 여러개의 다중노드가 띄워질시 사용할수있는 전략입니다.
hostname = 127.0.0.1
}
}
cluster {
seed-nodes = [""akka.tcp://ClusterSystem@127.0.0.1:9999""] # 알려진 시드노드를 통해 멤버쉽 처리가 됩니다.
roles = [""crawler""] # roles this member is in
#role.[""crawler""].min-nr-of-members = 3 # crawler role minimum node count #클러스터가 가질 최소 노드 제약수입니다. 최소조건이 만족되어야 클러스터로의 메시지가 작동됩니다.
}
}");
actorSystem = ActorSystem.Create("ClusterSystem", baseConfig);
//var clusterActor2 = actorSystem.ActorOf<EchoActor2>("myClusterPoolRouter");
var router = actorSystem.ActorOf(Props.Create<ClusterActor>().WithRouter(FromConfig.Instance), "myClusterPoolRouter");
Task.Delay(2000).Wait();
var msg = new Hello("hi");
router.Tell(msg);
router.Tell(msg);
router.Tell(msg);
} |
우리는 위 어플리케이션을 다음과 같은 테스트를 할것입니다.
일반 노드를 늘리고/줄이고 함에 따라 자동으로 분산처리가 축소되고 확장되는것을 확인할수가 있습니다.
클러스터 테스트 어플리케이션을 앞장에서 준비했던 이유가... 1프로젝트로 동적으로 다양한 클러스터 환경이 적용된 테스트 어플리케이션을 구동하기 위해서였습니다.
|
위 코드는 다음과같이 설계변경없이 응용가능하며, 직접 응용하여 변경해보십시오