Actor와 같은 비동기 메시지 객체를 이용하기위해서는, 그것을 호출하는 쪽에서도 비동기처리가 가능한

언어 스펙을 지원해야한다는것입니다. 이것은 AKKA를 이용하지 않더라도, 외부 비동기처리 메시지시스템과

연동을 할때도 중요한 요소입니다. Actor 메시지처리기법은 몰라도 되지만

JAVA가 제공하는 중요한 비동기처리 방식에대한 이해는 더 중요한 요소입니다.


Future(미래)와 Promice(약속)은 비동기처리에 있어서 중요한 개념입니다.

미래에는 블록킹이 없기때문에 어느시점 사용가능한 함수의 결과(성공또는 실패등을)

를 담을수 있는 용기라고 보면 됩니다.


이것은 AKKA의 요소가아닌 기존 개발 플랫폼의 비동기처리 요소로

나중에 Actor, Akka Stream등과 연동되는 중요한 요소로 사용이 되기때문에 Actor 학습에 앞서

기존 JAVA가 가진 비동기 처리기능에대해 정리하고 가겠습니다.

참고 URL


자바7의 java.util.concurrent.Future에 익숙하다면 scala.concurrent.Future가 자바 클래스를 감싼것으로 생각할수도 있지만

실제로는 그렇지 않다. java.util.concurrent.Future 클래스는 폴링을 필요로하며 결과를 얻기위해 블로킹 get 메서드를 사용해야만

한다. 하지만 스칼라의 퓨쳐는 블로킹이나 풀링을 사용하지 않고 함수결과를 조합할수 있으며

JAVA8의 CompletetableFuture<T>가 오히려 여기서 설명하는 퓨쳐와 유사하다.


Future


퓨쳐는 읽기전용이며, 퓨쳐에 담긴 값을 외부에서 변경할수 없습니다. 함수 실행이 끝나면

퓨쳐에 성공 또는 실패에 따른 결과가 들어갑니다. 퓨쳐에 담긴 결과는 완료된 다음부터는 

바뀌지 않으며, 외부에서 여러번 읽을수 있고 매번 같은 값을 돌려줍니다.

결과가 들어갈 용기가 있으므로 비동기적으로 실행되는 여러 함수들을 쉽게 조합할수가 있습니다.


퓨쳐는 여러함수의 결과가 다음 함수의 입력이되어야할시, 여러 함수가 병렬로 연결되며

나중에 여러 함수의 결과를 조합해야하는 파이프라이닝(pipelining)에 좋은 도구입니다.


Sample

Ff963556.dda12864-f9bd-4340-831f-212c4ee8bc4a(en-us,PandP.10).png

위와같은 형태의 조합 처리 되는 로직을 Future를 활용해 구현해보겠습니다.

import java.util.concurrent.CompletableFuture;


//선언부
@FunctionalInterface
interface FuncB {
    public int calc(int a, int b);
}
 
@FunctionalInterface
interface FuncA {
    public int calc(int a);
}

//사용부 
FuncA F1 = (int a) -> a+1;
FuncA F2 = (int a) -> a+1;
FuncA F3 = (int a) -> a+1;
FuncB F4 = (int a,int b) -> a+b;
             
Integer input=1;
CompletableFuture<Integer> futureB
        = CompletableFuture.supplyAsync(() -> F1.calc(1) );

CompletableFuture<Integer> futureD
        = CompletableFuture.supplyAsync(() -> F3.calc(F2.calc(1))  );

CompletableFuture<Void> combinedFuture
        = CompletableFuture.allOf(futureB, futureD)
        .thenAccept( r-> {
            // 최종 연산로직을 비동기로 처리한 케이스
            Integer value1 = futureB.join();
            Integer value2 = futureD.join();
            log( String.format("async result:%d", value1+value2 ));
        });
combinedFuture.join(); //연산을 비동기적으로 시작하려면 join명령을 사용합니다.
// 최종 연산로직을 동기처리로 변환한 예
combinedFuture.get();	//동기처리로 진행하려면 get을 사용합니다.
Integer sresult = futureB.get() + futureD.get();
log( String.format("sync result:%d",sresult) );


여러연산을 결합하기위해서는, 람다식 사용을 강요받습니다.

람다식에 익숙해지면 , 복잡한 비동기 연산처리에대해 간결한 코드로 작성이 가능해집니다.


Graph

일반적인 비동기 조합처리를 위해서 JAVA의 CompletetableFuture 로도 충분하지만


simple-graph-example.png

위와같이 훨씬더 복잡한 연산처리를 비동적으로 처리한다고 가정하면 CompletetableFuture는 한계에 있을수도 있습니다.

이경우 AKKA GRAPH DSL을 사용하여 , 비동기처리의 흐름을 제어할수 있습니다.

비동기처리가 스트림으로 확장된형태 라고볼수 있습니다.


/* construct protocol stack
 *         +------------------------------------+
 *         | stack                              |
 *         |                                    |
 *         |  +-------+            +---------+  |
 *    ~>   O~~o       |     ~>     |         o~~O    ~>
 * Message |  | codec | ByteString | framing |  | ByteString
 *    <~   O~~o       |     <~     |         o~~O    <~
 *         |  +-------+            +---------+  |
 *         +------------------------------------+
 */
val stack = codec.atop(framing)

// test it by plugging it into its own inverse and closing the right end
val pingpong = Flow[Message].collect { case Ping(id) => Pong(id) }
val flow = stack.atop(stack.reversed).join(pingpong)
val result = Source((0 to 9).map(Ping)).via(flow).limit(20).runWith(Sink.seq)
Await.result(result, 1.second) should ===((0 to 9).map(Pong))

위 코드는 복잡한 처리의 비동기처리의 흐름이 GRAPH DSL기법을 사용하여 간단하게

처리하는 한가지 예입니다. 외부에서 처리된 결과를 다시 조합하여 자신에게 출력이될수

있게하는 것이  양방향 흐름 제어(bidirectional flow) 라고 하며

이것을 Java가 제공하는 Futured로만으로는 구현하기 아주 복잡해질수 있습니다.   

현재는 이러한것이 있다라고만 알아두고 넘어가겠습니다.


참고링크: https://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-graphs.html



  • No labels
Write a comment…