Stream Basic1 - Sum(1~10)

package com.psmon.cachedb.actortest;

import org.springframework.stereotype.Component;
import akka.*;
import akka.stream.*;
import akka.stream.javadsl.*;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;

/*
  ActorStream class
 */
@Component
public class ActorStream extends ActorBase{
  public void runAll() {
    runBasic1();          
  }
  
  protected void runBasic1() {
    // Stream을 실행시킬수 있는곳 ( 액터시스템이 가지고 있음 )
    final Materializer materializer = ActorMaterializer.create(system);

    final Source<Integer, NotUsed> source =
    Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    // note that the Future is scala.concurrent.Future

    final Sink<Integer, CompletionStage<Integer>> sink =
        Sink.<Integer, Integer> fold(0, (aggr, next) -> aggr + next);

    // connect the Source to the Sink, obtaining a RunnableFlow
    final RunnableGraph<CompletionStage<Integer>> runnable =
        source.toMat(sink, Keep.right());  //Left:읽은쪽을 유지 , Right:기록한쪽을 유지 . Both:둘다 유지 , None : 유지없음

    // materialize the flow
    final CompletionStage<Integer> sum = runnable.run(materializer);

    try {      
      log.info("==> {}",sum.toCompletableFuture().get());    
    } catch (Exception e) {
      //TODO: handle exception
    }
  }
}


위 코드는 1부터 10까지 각 요소를 AkkaStream를 사용하여 SUM을 하는 코드입니다.

단순한 흐름처리 에서 AkkaStream은 다소 난해할수 있습니다.


AkkaStream은 흐름처리에 필요한 몇가지 요소를 제공하고 레고처럼 조립할수 있습니다.

Stream Graph

Stream연산을 만드는것은 마치 블록을 조합하는것과 유사합니다.

  • Source : 하나의 출력을 갖는 처리단계, 수신할 준비가 될때마다 데이터 요소를 방출합니다.
  • Sink : 하나의 입력을 가진 처리단계, 요청된 데이터요소를 받아들입니다.
  • RunnableGraph : 양쪽 끝 소스와 싱크가 연결되어 실행준비가된 요소이며, 실행후 소비가 가능해집니다.


Flow가 복잡해지더라도, 코드  복잡성을 최대한 줄일수 있는것이 Akka Stream의 특징이며

몇가지 복합 Flow를 가지는 Stream처리기를 작성해보도록 하겠습니다.


Stream Basic2 - 비동기/동기 처리를 혼합하기

비동기적으로( 동시에 여러개 처리가됩니다.) 처리할수 있는 영역 ( 소스+1 연산) 과

동기적으로 처리해야할부분의 ( *2연산 ) 경계를 구분하여 스트림를 처리를 시도해봅시다.


  protected void runBasic2(){
    final Materializer materializer = ActorMaterializer.create(system);
    Source.range(1, 3)
      .map(x -> x + 1).async()
      .map(x -> x * 2)
      .to( Sink.foreach( result ->{
        log.info("runBasic2 ==> {}", result );
      }))
      .run(materializer);
  }

2018-07-29 23:13:32,053 INFO - runBasic2 ==> 4
2018-07-29 23:13:32,054 INFO - runBasic2 ==> 6
2018-07-29 23:13:32,055 INFO - runBasic2 ==> 8

Basic1편에서 수행하는 것보다 더 복잡한것을 처리하지만, 실제 작성코드는 더 간결해졌습니다.

함수형 처리방식의 장점을 살린 방식으로,  처리방식의 Flow 방식 파악이 용이해지며


람다식과 함수형의 혼합이 가독성이 떨어지는것같지만 사실은 그렇지 않습니다.

위와같은 처리를 하려면 비동기 완료이벤트를 결합을하던, 스레드 모델을 혼합하던 작성해야할 코드가

비약적으로 늘어나게되며..,이렇게 작성된 코드는 Flow의 부분수정 결합요소변경및 전체 파악이 어려웠다란 점입니다.

물론 이것은 언어적 요소가 항상 해결해주는것이 아닌, 적절한 툴(API)을 활용해야합니다.

Stream Basic 3 - Flow를 분기하고 합하기

화이트 보드에 그려놓은, 스트림처리 Flow(Graph)를 코드로 이질감없이 옮길수 있는것이(혹은반대) Akka Stream 의 목표입니다.


아래 코드는 실제 작동하는 코드이며, 복합적인 Stream처리를 얼마나 단순하게 할수 있는지 한가지 예입니다.

전통적인 개발방식에서는 비동기와 동기처리를 혼합사용하면서 콜백헬을 만들수도 있는 부분입니다.


최근 Stream 처리기법들이 함수형 개발패턴을 활용하면서,  사용방법이 유사해졌습니다.

이 샘플에서는 Fan(f) 이라는 새로운 요소를 사용하였습니다.
Fan의 개념은 이것을 지날때 분기가 일어나거나? 다시 구간이 합쳐진다던지 하는 처리를 하게 됩니다. 


위 그래프에대한 Flow에  코드처리는 단 두줄이면 됩니다. 

AKK Stream Graph DSL의 강력함을 설명할수 있는 짧은 코드입니다.

  • 녹색: 왼쪽에서 오른쪽으로 흐르는 하나의 흐름(혹은 위에서 아래로)  - Via
  • 노랑: 분기가 갈라지거나 합해지는 구간(  bcast : out이 2개 , merger : in 이 2개 ) - Merge
  • 레드:  분기 처리 - Bcast


구현샘플

Flow를 각각 구현하고 이것을 다양한 방법으로 조립을 할수가 있습니다.

다음장에서 각각의 구성요소 상세하게 설명을하고, 결합을 하는 방법에대해 알아보겠습니다.

  protected void runBasic3(){
    final Materializer materializer = ActorMaterializer.create(system);
    final Source<Integer, NotUsed> in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
    final Sink<List<String>, CompletionStage<List<String>>> sink = Sink.head();
    final Flow<Integer, Integer, NotUsed> f1 = Flow.of(Integer.class).map(elem -> elem + 10);
    final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).map(elem -> elem + 20);
    final Flow<Integer, String, NotUsed> f3 = Flow.of(Integer.class).map(elem -> elem.toString());
    final Flow<Integer, Integer, NotUsed> f4 = Flow.of(Integer.class).map(elem -> elem + 30);


    final RunnableGraph<CompletionStage<List<String>>> result =
            RunnableGraph.fromGraph(
                    GraphDSL     // create() function binds sink, out which is sink's out port and builder DSL
                            .create(   // we need to reference out's shape in the builder DSL below (in to() function)
                                    sink,                // previously created sink (Sink)
                                    (builder, out) -> {  // variables: builder (GraphDSL.Builder) and out (SinkShape)
                                      final UniformFanOutShape<Integer, Integer> bcast = builder.add(Broadcast.create(2));
                                      final UniformFanInShape<Integer, Integer> merge = builder.add(Merge.create(2));

                                      final Outlet<Integer> source = builder.add(in).out();
                                      builder.from(source).via(builder.add(f1))
                                              .viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge)
                                              .via(builder.add(f3.grouped(1000))).to(out);  // to() expects a SinkShape
                                      builder.from(bcast).via(builder.add(f4)).toFanIn(merge);

                                      return ClosedShape.getInstance();
                                    }));

      result.run(materializer);
  }



참고 URL :






  • No labels
Write a comment…