Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEmacs
package com.psmon.cachedb.actortest;

import org.springframework.stereotype.Component;
import akka.*;
import akka.stream.*;
import akka.stream.javadsl.*;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;

/*
  ActorStream class
 */
@Component
public class ActorStream extends ActorBase{
  public void runAll() {
    runBasic1();          
  }
  
  protected void runBasic1() {
    // Stream을 실행시킬수 있는곳 ( 액터시스템이 가지고 있음 )
    final Materializer materializer = ActorMaterializer.create(system);

    final Source<Integer, NotUsed> source =
    Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    // note that the Future is scala.concurrent.Future

    final Sink<Integer, CompletionStage<Integer>> sink =
        Sink.<Integer, Integer> fold(0, (aggr, next) -> aggr + next);

    // connect the Source to the Sink, obtaining a RunnableFlow
    final RunnableGraph<CompletionStage<Integer>> runnable =
        source.toMat(sink, Keep.right());  //Left:읽은쪽을 유지 , Right:기록한쪽을 유지 . Both:둘다 유지 , None : 유지없음

    // materialize the flow
    final CompletionStage<Integer> sum = runnable.run(materializer);

    try {      
      log.info("==> {}",sum.toCompletableFuture().get());    
    } catch (Exception e) {
      //TODO: handle exception
    }
  }
}


위 코드는 1부터 10까지 각 요소를  sum을 하는 어렵게 작성된 요소를 AkkaStream를 사용하여 SUM을 하는 코드입니다.

AkkaStream은 코드만 보면 가독성이 떨어지는것처럼보이고 단순한 흐름처리 에서 AkkaStream은 다소 난해할수 있습니다.

그래서 Flow를 정의하고 그래프를 그려 코드와 일치시키는 가독률을 높이는 연습이 필요합니다.

위 스트림처리보다 더 복잡한것을 점점 추가하면서 처리할것이지만 실제 위 코드에서 더이상


AkkaStream은 흐름처리에 필요한 몇가지 요소를 제공하고 레고처럼 조립할수 있습니다.

복잡성이 비대하게 증가하지 않는다란점을 샘플코드로 확인및 증명 예정입니다.

Stream Graph

Stream연산을 만드는것은 마치 블록을 조합하는것과 유사합니다.

  • Source : 하나의 출력을 갖는 처리단계, 수신할 준비가 될때마다 데이터 요소를 방출합니다.
  • Sink : 하나의 입력을 가진 처리단계, 요청된 데이터요소를 받아들입니다.
  • RunnableGraph : 양쪽 끝 소스와 싱크가 연결되어 실행준비가된 요소이며, 실행후 소비가 가능해집니다.


Flow가 복잡해지더라도, 코드 작성 코드  복잡성을 최대한 줄일수 있는것이 Akka Stream의 특징이며

점점 복잡해지고 조금더  쓸만한 Flow를 가지는 Stream처리기를 작성해보도록 하겠습니다.

...

Stream Basic 3 - Flow를 분기하고 합하기

칠판에 화이트 보드에 그려놓은, 스트림처리 Flow(Graph)를 코드로 이질감없이 옮길수 있는것이(혹은반대) Akka Stream 의 목표이며최근 모던한 언어들(Java9이상,Scala,언랭등)이 가지고 있는 자료구조들은 Stream처리가 되도록 고안이 되어있으며( 컬렉션객체는 아닙니다.)목표입니다.


아래 코드는 실제 작동하는 코드이며, 복합적인 Stream처리를 얼마나 단순하게 할수 있는지 한가지 예입니다.

전통적인 개발방식에서는 비동기 완료에 대한 함수를 수없이 연결하면서 콜백헬을 만들수 비동기와 동기처리를 혼합사용하면서 콜백헬을 만들수도 있는 부분입니다. 


최근 Stream 처리기법들이 함수형  인터페이스를 함수형 개발패턴을 활용하면서,  사용방법이 유사해졌습니다.

여기서는 이 샘플에서는 Fan(f) 이라는 새로운 것이 등장하였습니다요소를 사용하였습니다.
Fan의 개념은 간단합니다. 이것을 지날때 분기가 일어나거나? 다시 구간이 합쳐진다던지 하는 처리를 하게 됩니다. 

...

Flow를 각각 구현하고 이것을 다양한 방법으로 결합을 조립을 할수가 있습니다.

다음장에서 각각의 구성요소 상세하게 설명을하고, 결합을 하는 방법에대해 알아보겠습니다.

Code Block
languagejava
themeEmacs
  protected void runBasic3(){
    final Materializer materializer = ActorMaterializer.create(system);
    final Source<Integer, NotUsed> in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
    final Sink<List<String>, CompletionStage<List<String>>> sink = Sink.head();
    final Flow<Integer, Integer, NotUsed> f1 = Flow.of(Integer.class).map(elem -> elem + 10);
    final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).map(elem -> elem + 20);
    final Flow<Integer, String, NotUsed> f3 = Flow.of(Integer.class).map(elem -> elem.toString());
    final Flow<Integer, Integer, NotUsed> f4 = Flow.of(Integer.class).map(elem -> elem + 30);


    final RunnableGraph<CompletionStage<List<String>>> result =
            RunnableGraph.fromGraph(
                    GraphDSL     // create() function binds sink, out which is sink's out port and builder DSL
                            .create(   // we need to reference out's shape in the builder DSL below (in to() function)
                                    sink,                // previously created sink (Sink)
                                    (builder, out) -> {  // variables: builder (GraphDSL.Builder) and out (SinkShape)
                                      final UniformFanOutShape<Integer, Integer> bcast = builder.add(Broadcast.create(2));
                                      final UniformFanInShape<Integer, Integer> merge = builder.add(Merge.create(2));

                                      final Outlet<Integer> source = builder.add(in).out();
                                      builder.from(source).via(builder.add(f1))
                                              .viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge)
                                              .via(builder.add(f3.grouped(1000))).to(out);  // to() expects a SinkShape
                                      builder.from(bcast).via(builder.add(f4)).toFanIn(merge);

                                      return ClosedShape.getInstance();
                                    }));

      result.run(materializer);
  }



참고 URL :