You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »



추가 종속(메이븐)
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream_2.12</artifactId>
            <version>${akka.version}</version>
        </dependency>        


Stream Basic1 - Sum(1~10)

package com.psmon.cachedb.actortest;

import org.springframework.stereotype.Component;
import akka.*;
import akka.stream.*;
import akka.stream.javadsl.*;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;

/*
  ActorStream class
 */
@Component
public class ActorStream extends ActorBase{
  public void runAll() {
    runBasic1();          
  }
  
  protected void runBasic1() {
    // Stream을 실행시킬수 있는곳 ( 액터시스템이 가지고 있음 )
    final Materializer materializer = ActorMaterializer.create(system);

    final Source<Integer, NotUsed> source =
    Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    // note that the Future is scala.concurrent.Future

    final Sink<Integer, CompletionStage<Integer>> sink =
        Sink.<Integer, Integer> fold(0, (aggr, next) -> aggr + next);

    // connect the Source to the Sink, obtaining a RunnableFlow
    final RunnableGraph<CompletionStage<Integer>> runnable =
        source.toMat(sink, Keep.right());  //Left:읽은쪽을 유지 , Right:기록한쪽을 유지 . Both:둘다 유지 , None : 유지없음

    // materialize the flow
    final CompletionStage<Integer> sum = runnable.run(materializer);

    try {      
      log.info("==> {}",sum.toCompletableFuture().get());    
    } catch (Exception e) {
      //TODO: handle exception
    }
  }
}


위 코드는 1부터 10까지 각 요소를  sum을 하는 어려운 코드로 보일수 있으나

Stream을 설명을 할수 있는 가장 심플한 코드입니다. 

AkkaStream은 코드만 보면 가독성이 떨어지는것처럼보이고 난해할수 있습니다.

그래서 그래프를 그려 가독률을 높이는 연습이 필요합니다.  위 코드를 그래프와 연관시켜 보겠습니다.

Stream Graph

Stream연산을 만드는것은 마치 블록을 조합하는것과 유사합니다.

  • Source : 하나의 출력을 갖는 처리단계, 수신할 준비가 될때마다 데이터 요소를 방출합니다.
  • Sink : 하나의 입력을 가진 처리단계, 요청된 데이터요소를 받아들입니다.
  • RunnableGraph : 양쪽 끝 소스와 싱크가 연결되어 실행준비가된 요소이며, 실행후 소비가 가능해집니다.


Flow가 복잡해지더라도, 코드 작성 복잡성을 최대한 줄일수 있는것이 Akka Stream의 특징이며

점점 복잡해지고 쓸만한 Flow를 가지는 Stream처리기를 작성해보도록 하겠습니다.


Stream Basic2 - 비동기/동기 처리를 혼합하기

비동기적으로( 동시에 여러개 처리가됩니다.) 처리할수 있는 영역 ( 소스+1 연산) 과

동기적으로 처리해야할부분의 ( *2연산 ) 경계를 구분하여 스트림를 처리를 시도해봅시다.


  protected void runBasic2(){
    final Materializer materializer = ActorMaterializer.create(system);
    Source.range(1, 3)
      .map(x -> x + 1).async()
      .map(x -> x * 2)
      .to( Sink.foreach( result ->{
        log.info("runBasic2 ==> {}", result );
      }))
      .run(materializer);
  }

2018-07-29 23:13:32,053 INFO - runBasic2 ==> 4
2018-07-29 23:13:32,054 INFO - runBasic2 ==> 6
2018-07-29 23:13:32,055 INFO - runBasic2 ==> 8

Basic1편에서 수행하는 것보다 더 복잡한것을 처리하지만, 실제 작성코드는 더 간결해졌습니다.

다음에 수행할 스트림을 분기하고 합하는 과정역시 코드의 복잡성이 크게 증가하지 않는다란 점에서

Akka에서 스트림처리를 위해 추상화한 수준이 신기방기합니다. 


Stream Basic 3 - Flow를 분기하고 합하기




  • No labels