책임이 분리된 팀이 생성한 데이터를 이용해야할때 일반적으로 카프카 연결기를 두고  필요한 데이터를 소비함으로  데이터를 재구성할수 있습니다.

하지만 팀을 넘어 이것이 외부 공급사의 데이터인 경우 다음과 같은 정책적인 문제로 DB를 포함 Kafka를 활용하는것에 제약이 있을수 있습니다.



카프카는 데이터를 제공하는 좋은 기술임에도 불구 양사간 데이터 협약을 한다고 해도 이것을 이용하기는 정책상 어려울수 있습니다.

그래서 여전히 데이터를 제공하는 쪽은 API를 이용할수 있는 접근보안기능과 함께 , API의 이용제약및 이용료가 포함된 API형태로 제공하고 이용하는것이 일반적입니다.


협약된 API를 통해  조회권한 내에서만 이용하여 새로운 서비스에 필요한 데이터를 재 구성할수 있습니다.

이것은 공개된  API라고 할지라도 이용량이 높을시 과금및 호출 제약이 있을수 있습니다.


대용량의 데이터를 일반적인 Rest-API를 이용하여 제약내에서 대용량 분산 처리할수 있는 기술적 주제를 다루어 보겠습니다.


API를 통해 수집해야할 종류는 다음과 같은 것이 될수 있습니다.


API의 한계와 안전한 수집전략

대량의 데이터를 수집하기위해  API방식은 다음과 같은 한계가 존재합니다.


이러한 제약을 해결하기위해서는 다음과 같은 전략이 추가로 필요하게됩니다.

수집명령단위

API를 호출해야하는 수집기가 분산처리 되기위해서 수집명령 단위는 너무 크지도 않고 너무작지도 않은 적절한 수준이여야 하며

이 명령은 실행되는 위치만 고려해야하는것이 아닌 수행되는 시간고려되어 분산처리를 고민해야합니다.


이 단위가 너무 크면 분산처리로 보기 어려우며, 너무 작으면 수집 명령 이벤트 자체의 빈도가 비대해질수도 있습니다.


커피주문기의 간단한 예를 들면 커피주문 명령을 내릴수 있지만~  세부적인 커피 제조명령(얼음을 3개까지 타)까지 할 필요가 없음을 의미합니다.

제조명령까지 내리게되면 수집스케줄러는 커피를 타는 공정을 모두 알아야하며 너무 많은 명령을 관리하게 됩니다.


수집을 처리하는 목적및 효율에 따라 수집명령의 세부명령 수준이 각각 다르게 설계될수 있습니다.


수집분산 단위


작동 위치 분산설명을 위해 채널 A/B로 구분하였으나  수집을 처리하는 수집기는 유연하게 스케일 in/out이 되어야 하기 때문에  위치 투명성을 지원해야합니다.

이것은 어디에 작동되어도 상관없이 수행되어야함을 의미하며 카프카 파티션 구성과 연동되어 유연한 분산처리가 가능을 의미하게 됩니다. 

즉 작업의 A/B의 분산룰은 작업량이 적은 노드에게 자동 분배가 될수 있습니다.


작동시간 분배는 자동이아닌 계획을 할수 있으며 더 우선으로 진행되어야 할 채널이 있는 경우 작동시간 조정을 통해 우선순위를 조정할수도 있습니다.


더 세부적인 Task단위는 수집명령을 받은 수집기에서 명령을 분해해 단일장비를 통해 동시처리를 진행할수 있어야하며

단일장비내에서 동시성과 제약을 고려 복잡한 도메인 처리과정을 액터모델을 통해 다시 세부적으로 표현을 해보겠습니다.

노드 분산처리는 일반적인 카프카의 파티션 분산 전략을 이용하고 있고 카프카의 일반적인 전략이기 때문에 생략을 하며

이것을 카프카가 발생한 메시지의 단일장비 병렬,동시성 처리라고 정의를 해보겠습니다.


단일 시스템에서 액터의 논리적 구성



이제 수집처리기의 액터 구성 설계를 했고 수집처리를 위한 액터메시지 의 흐름을 설계를 해보겠습니다.

액터 메시지 Flow

액터의 이벤트 메시지흐름은 UML_Sequence Diagram을 통해 설명할수 있으며  여기서 작성한 모델과 액터의 작동을  일치시킬수 있습니다.

액터는 각각 순차처리 보장되는 메일박스를 가지고 있으며 참조를 통해 비동기적으로 이벤트를 전송할수 받을수도 있기떄문입니다.

멀티스레드의 동시성 문제를 해결하기위해 메시지큐를 가지고 꺼내어 처리하는 방식이 액티브오브젝트 패턴이며

액터의 작동방식은 액티브오브젝트 패턴과 동일합니다.

카프카 작동방식 역시 액티브오브젝트 패턴으로 분류될수 있으며 이 액티브오브젝트 패턴은 액터모델과 상관없이 게임서버에서는 일반적으로

직접 구현해서 사용하며,  카프카는 웹진영에서 그것을 구현없이 쉽게 이용할수 있는 큐시스템입니다.

이러한 초기 개념은 언랭의 창시자  무려 40년 전인 1973년에 칼 휴이트라는 사람이 제안했던 개념이다. 휴이트의 개념은 얼랭(Erlang)이라는 언어로 구체화되었고

스웨덴 통신회사인 에릭슨(Ericsson)에서 고도의 동시성 프로그램을 구현할 때 실제로 사용되었습니다. - 액티브오브젝트 패턴의 기원


우리의 도메인 이벤트의 흐름에 따라 작동 플로우를 시퀀스 다이어그램을 통해  구현전 설계를 할수 있으며

여기서 만든 모델은 액터구현을 통해 해당 적용할수 있습니다. 


우선 액터모델이 표현하려는 중요 이벤트는 다음과 같이 표현될수 있습니다.



코드속에 설계가 녹아들어 실제 작동되는것을 만드는 실천형 아키텍트를 지향하며 마지막으로 위 설계요소중 중요 구현요소를 살펴보겠습니다.


주요 구현코드

유닛테스트

    @Test
    @DisplayName("TPS 분배테스트")
    public void TPSTest() throws Exception{

        new TestKit(system){{
            final TestKit probe = new TestKit(system);
            processorActor = system.actorOf(SPRING_EXTENSION_PROVIDER.get(system).props("processorActor"), "processor");
            for(int i=0 ; i < 100 ; i++){
                processorActor.tell(new TestCollectWorkEvent("MallA","test1", 1, probe.getRef()), ActorRef.noSender());
                processorActor.tell(new TestCollectWorkEvent("MallB","test1", 1, probe.getRef()), ActorRef.noSender());
                processorActor.tell(new TestCollectWorkEvent("MallC","test1", 1, probe.getRef()), ActorRef.noSender());
                processorActor.tell(new TestCollectWorkEvent("MallE","test1", 1, probe.getRef()), ActorRef.noSender());
            }

            int testCount = 100 *4;
            within(
                    Duration.ofSeconds(100),
                    () -> {
                        for (int i = 0; i < testCount; i++) {
                            // check that the probe we injected earlier got the msg
                            probe.expectMsg(Duration.ofSeconds(5), "done");
                        }
                        // Will wait for the rest of the 3 seconds
                        expectNoMessage();
                        return null;
                    });
        }};
    }


논리적 단위로 액터모델 동적생성

                .match(CollectorMessage.class, event -> {
                    ActorRef workActor = mallWorkActors.get(event.getMallId());
                    if(workActor == null){
                        workActor = context().actorOf(SPRING_EXTENSION_PROVIDER.get(getContext().getSystem()).props(
                                "mallWorkActor"), "MallWorkActor-" + event.getMallId());
                        mallWorkActors.put(event.getMallId(), workActor);
                    }
                    workActor.tell(event, ActorRef.noSender());
                })

작업자 개수와 속도조절기 탑재

        // API Work Node -5
        List<String> paths = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            ActorRef r = context().actorOf(SPRING_EXTENSION_PROVIDER.get(getContext().getSystem()).props("apiWorkActor"), "apiWorkActor-" + i);
            paths.add( "/user/" + context().parent().path().name()  + "/" + context().self().path().name()+ "/" + r.path().name() );
        }

        apiWorkRouter = getContext().actorOf(new RoundRobinGroup(paths).props(), "apiWorkRouter");

        // Api TPS2 per MALL
        int processCouuntPerSec = 2;
        throttler =
                Source.actorRef(50000, OverflowStrategy.dropNew())
                        .throttle(processCouuntPerSec, FiniteDuration.create(1, TimeUnit.SECONDS),
                                processCouuntPerSec, (ThrottleMode) ThrottleMode.shaping())
                        .to(Sink.actorRef(apiWorkRouter, akka.NotUsed.getInstance()))
                        .run(materializer);

연관 참고자료

NEXT