Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info

스프링 클라우드에서 Eureka - Zuul - Ribon 으로 연결되는 다이나믹하게 노드를 늘리고 로드밸런스를 지원하는 휼륭한 툴을 지원한다. 메시지큐와 연동되어 도메인이벤트도

원격처리가 되는듯하나. RestAPI에서 한해 유용해보인다. RestAPI가 아닌 원격 비동기 대량처리 스트림처리부분은 AKKA의 메시징 기법이 직관적이고 안정적이고 성능에 이점이 있는듯 하며

Kafka와 같은 외부 메시툴연동하기에도 더 이점이 있다. 더욱이 기존 구성된 마이크로서비스에 임베디드 되어 분산처리하는 방식이라 추가적인 큐시스템을 구축하지 않아도된다.

목적이 다른 플랫폼의 상호연동은 어려운 주제이기도 하지만, 잘 조화를 이루면 서로의 약점을 보완해줄수 있을것으로 기대해봅니다.


클러스터의 이해

클러스터를 구축하기전 클러스터에대한 이해가 필요합니다. 클러스터는 AKKA에서 생겨난 컨셉이 아니며

...

이 차이를 잘 구분하면 클러스터를 이용할 준비가 된것입니다.


AKKA 사용 골

Actor1에 작업의뢰를 했지만 원격에 떨어진 Actor2의 결과를 비동기처리를 할수 있으며 이것은 Ribon의 원격명령에의해

...

물론 일반적으로 외부 메시지큐는 카프카또는 레빗엠큐를 사용하여  도메인 이벤트의 전달 툴로 대체할수 있으며 AKKA일 필요는 없습니다.  


구현샘플 : 충돌없는 복제 

우선 이 기능이 유용함을 설명하기 위해 다음과 같이 보편적 도메인 목표를 지정하였습니다.

...

제공될수 있습니다. 여기서는 액터를 통해 충돌없는 데이터복제를 한후 각각의 노드에서 RestAPI로 제공을 받는 방법을 선택해보겠습니다.


시드를 책임지는 디스커버리 앱설정

git : https://github.com/psmon/springcloud/tree/master/akka-cluster

...

멤버가 가입되고 탈퇴하는 모든 메시지를 잡담을 통해 서로를 인지할수 있고 인지를 하기 때문에 목적지의 IP를

알필요없이 라우터를 통한 호출이 가능합니다.


요청시 데이터 를 통기화하는 액터

Code Block
languagejava
themeEmacs
@Override
public void preStart() {
    // first read for sync , After that no more db load
    tableRepository.findAllTableInfos().forEach(tableEntity -> {
        TableInfo tableInfo = tableEntity.toTableInfo();
        tableInfos.add(tableInfo);
    });
}

@Override
public Receive createReceive() {
    return receiveBuilder()
            .match(TableCMD.class, s -> {
                if(s.cmdType==TableCMD.TableCMDType.SYNC_FIRST){
                    log.info("Received TableCMD message: {}", s);
                    ActorRef requestActor = getSender();
                    seqNumSync++;
                    requestActor.tell(new TableInfoList(seqNumSync,tableInfos),ActorRef.noSender());
                }
            })
            .matchAny(o -> log.info("received unknown message"))
            .build();
}


//Config
cluster {
  seed-nodes = [
    "akka.tcp://ClusterSystem@127.0.0.1:2551"
  ]

  roles = ["datacenter"]

...

여기서는 최초의 저장소의 데이터만 전달해주는기능만 있으며, 이것은 Update,Insert,Delete등의 기능과 함께 변경부 전달이라는 기능을 추가할수 있습니다. 


노드 가입시 자동 데이터복제

Code Block
languagejava
themeEmacs
//Create a TableSync object when it joins the cluster
Cluster.get(system).registerOnMemberUp(new Runnable() {
	@Override
	public void run() {
		system.actorOf(TableInfoActor.props(),"lobbyTableSync");
	}
});


cluster {
  seed-nodes = [
    "akka.tcp://ClusterSystem@127.0.0.1:2551"
  ]

  roles = ["lobby"]

  role{
    seed.min-nr-of-members=1
    datacenter.min-nr-of-members=1
    lobby.min-nr-of-members=1
  }

...

이 충족되었을때 동기화를 담당하는 액터를 생성하는 것입니다. 또한 자신의 롤은 lobby임을 설정하는것입니다.


동기화를 요청하는 액터

Code Block
languagejava
themeEmacs
ActorRef dataCenter = getContext().actorOf(FromConfig.getInstance().props(),
        "dcTableSyncRouter");


@Override
public void preStart() {
    // first requestData
    sendRefreshTable(TableCMD.TableCMDType.SYNC_FIRST);
    getContext().setReceiveTimeout(Duration.create(10, TimeUnit.SECONDS));
}

private void sendRefreshTable(TableCMD.TableCMDType cmdType) {
    dataCenter.tell(new TableCMD(cmdType),getSelf() );
}

@Override
public Receive createReceive() {
    return receiveBuilder()
            .match(TableInfoList.class, tableList -> {
                tableInfos = tableList.getValues();
                log.info(String.format("===== first Table sync, count:%d",tableInfos.size()));
            })
            .match(ReceiveTimeout.class, message -> {
                log.info("=== Timer");
                sendRefreshTable(TableCMD.TableCMDType.SYNC_FIRST);
            })
            .match(TableCMD.class, cmd -> {
                if(cmd.cmdType == TableCMD.TableCMDType.SYNC_FIRST)
                sender().tell(tableInfos,null);
            })
            .matchAny(o -> log.info("received unknown message"))
            .build();
}

...