Stream Basic1 - Sum(1~10)
package com.psmon.cachedb.actortest; import org.springframework.stereotype.Component; import akka.*; import akka.stream.*; import akka.stream.javadsl.*; import java.util.Arrays; import java.util.concurrent.CompletionStage; /* ActorStream class */ @Component public class ActorStream extends ActorBase{ public void runAll() { runBasic1(); } protected void runBasic1() { // Stream을 실행시킬수 있는곳 ( 액터시스템이 가지고 있음 ) final Materializer materializer = ActorMaterializer.create(system); final Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); // note that the Future is scala.concurrent.Future final Sink<Integer, CompletionStage<Integer>> sink = Sink.<Integer, Integer> fold(0, (aggr, next) -> aggr + next); // connect the Source to the Sink, obtaining a RunnableFlow final RunnableGraph<CompletionStage<Integer>> runnable = source.toMat(sink, Keep.right()); //Left:읽은쪽을 유지 , Right:기록한쪽을 유지 . Both:둘다 유지 , None : 유지없음 // materialize the flow final CompletionStage<Integer> sum = runnable.run(materializer); try { log.info("==> {}",sum.toCompletableFuture().get()); } catch (Exception e) { //TODO: handle exception } } }
위 코드는 1부터 10까지 각 요소를 sum을 하는 어렵게 작성된 코드입니다.
AkkaStream은 코드만 보면 가독성이 떨어지는것처럼보이고 난해할수 있습니다.
그래서 Flow를 정의하고 그래프를 그려 코드와 일치시키는 가독률을 높이는 연습이 필요합니다.
위 스트림처리보다 더 복잡한것을 점점 추가하면서 처리할것이지만 실제 위 코드에서 더이상
복잡성이 비대하게 증가하지 않는다란점을 샘플코드로 확인및 증명 예정입니다.
Stream Graph
Stream연산을 만드는것은 마치 블록을 조합하는것과 유사합니다.
- Source : 하나의 출력을 갖는 처리단계, 수신할 준비가 될때마다 데이터 요소를 방출합니다.
- Sink : 하나의 입력을 가진 처리단계, 요청된 데이터요소를 받아들입니다.
- RunnableGraph : 양쪽 끝 소스와 싱크가 연결되어 실행준비가된 요소이며, 실행후 소비가 가능해집니다.
Flow가 복잡해지더라도, 코드 작성 복잡성을 최대한 줄일수 있는것이 Akka Stream의 특징이며
점점 복잡해지고 쓸만한 Flow를 가지는 Stream처리기를 작성해보도록 하겠습니다.
Stream Basic2 - 비동기/동기 처리를 혼합하기
비동기적으로( 동시에 여러개 처리가됩니다.) 처리할수 있는 영역 ( 소스+1 연산) 과
동기적으로 처리해야할부분의 ( *2연산 ) 경계를 구분하여 스트림를 처리를 시도해봅시다.
protected void runBasic2(){ final Materializer materializer = ActorMaterializer.create(system); Source.range(1, 3) .map(x -> x + 1).async() .map(x -> x * 2) .to( Sink.foreach( result ->{ log.info("runBasic2 ==> {}", result ); })) .run(materializer); }
Basic1편에서 수행하는 것보다 더 복잡한것을 처리하지만, 실제 작성코드는 더 간결해졌습니다.
함수형 처리방식의 장점을 살린 방식으로, 처리방식의 Flow 방식 파악이 용이해지며
람다식과 함수형의 혼합이 가독성이 떨어지는것같지만 사실은 그렇지 않습니다.
위와같은 처리를 하려면 비동기 완료이벤트를 결합을하던, 스레드 모델을 혼합하던 작성해야할 코드가
비약적으로 늘어나게되며..,이렇게 작성된 코드는 Flow의 부분수정 결합요소변경및 전체 파악이 어려웠다란 점입니다.
물론 이것은 언어적 요소가 항상 해결해주는것이 아닌, 적절한 툴(API)을 활용해야합니다.
Stream Basic 3 - Flow를 분기하고 합하기
칠판에 그려놓은, 스트림처리 Flow(Graph)를 코드로 이질감없이 옮길수 있는것이(혹은반대) Akka Stream 의 목표이며
최근 모던한 언어들(Java9이상,Scala,언랭등)이 가지고 있는 자료구조들은 Stream처리가 되도록 고안이 되어있으며( 컬렉션객체는 아닙니다.)
아래 코드는 실제 작동하는 코드이며, Stream처리를 얼마나 단순하게 할수 있는지 한가지 예입니다.
전통적인 개발방식에서는 비동기 완료에 대한 함수를 수없이 연결하면서 콜백헬을 만들수 있는 부분입니다.
최근 Stream 처리기법들이 함수형 인터페이스를 활용하면서, 사용방법이 유사해졌습니다.
여기서는 Fan(f) 이라는 새로운 것이 등장하였습니다. Fan의 개념은 간단합니다. 이것을 지날때
분기가 일어나거나? 다시 구간이 합쳐진다던지 하는 처리를 하게 됩니다.
위 그래프에대한 Flow에 코드처리는 단 두줄이면 됩니다.
AKK Stream Graph DSL의 강력함을 설명할수 있는 짧은 코드입니다.
- 녹색: 왼쪽에서 오른쪽으로 흐르는 하나의 흐름(혹은 위에서 아래로) - Via
- 노랑: 분기가 갈라지거나 합해지는 구간( bcast : out이 2개 , merger : in 이 2개 ) - Merge
- 레드: 분기 처리 - Bcast
구현샘플
Flow를 각각 구현하고 이것을 다양한 방법으로 결합을 할수가 있습니다.
다음장에서 각각의 구성요소 상세하게 설명을하고, 결합을 하는 방법에대해 알아보겠습니다.
protected void runBasic3(){ final Materializer materializer = ActorMaterializer.create(system); final Source<Integer, NotUsed> in = Source.from(Arrays.asList(1, 2, 3, 4, 5)); final Sink<List<String>, CompletionStage<List<String>>> sink = Sink.head(); final Flow<Integer, Integer, NotUsed> f1 = Flow.of(Integer.class).map(elem -> elem + 10); final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).map(elem -> elem + 20); final Flow<Integer, String, NotUsed> f3 = Flow.of(Integer.class).map(elem -> elem.toString()); final Flow<Integer, Integer, NotUsed> f4 = Flow.of(Integer.class).map(elem -> elem + 30); final RunnableGraph<CompletionStage<List<String>>> result = RunnableGraph.fromGraph( GraphDSL // create() function binds sink, out which is sink's out port and builder DSL .create( // we need to reference out's shape in the builder DSL below (in to() function) sink, // previously created sink (Sink) (builder, out) -> { // variables: builder (GraphDSL.Builder) and out (SinkShape) final UniformFanOutShape<Integer, Integer> bcast = builder.add(Broadcast.create(2)); final UniformFanInShape<Integer, Integer> merge = builder.add(Merge.create(2)); final Outlet<Integer> source = builder.add(in).out(); builder.from(source).via(builder.add(f1)) .viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge) .via(builder.add(f3.grouped(1000))).to(out); // to() expects a SinkShape builder.from(bcast).via(builder.add(f4)).toFanIn(merge); return ClosedShape.getInstance(); })); result.run(materializer); }
참고 URL : https://doc.akka.io/docs/akka/current/stream/stream-graphs.html