Page History
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.12</artifactId> <version>${akka.version}</version> </dependency> |
Stream
...
Basic1 - Sum(1~10)
Code Block | ||||
---|---|---|---|---|
| ||||
package com.psmon.cachedb.actortest;
import org.springframework.stereotype.Component;
import akka.*;
import akka.stream.*;
import akka.stream.javadsl.*;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;
/*
ActorStream class
*/
@Component
public class ActorStream extends ActorBase{
public void runAll() {
runBasic1();
}
protected void runBasic1() {
// Stream을 실행시킬수 있는곳 ( 액터시스템이 가지고 있음 )
final Materializer materializer = ActorMaterializer.create(system);
final Source<Integer, NotUsed> source =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
// note that the Future is scala.concurrent.Future
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.<Integer, Integer> fold(0, (aggr, next) -> aggr + next);
// connect the Source to the Sink, obtaining a RunnableFlow
final RunnableGraph<CompletionStage<Integer>> runnable =
source.toMat(sink, Keep.right()); //Left:읽은쪽을 유지 , Right:기록한쪽을 유지 . Both:둘다 유지 , None : 유지없음
// materialize the flow
final CompletionStage<Integer> sum = runnable.run(materializer);
try {
log.info("==> {}",sum.toCompletableFuture().get());
} catch (Exception e) {
//TODO: handle exception
}
}
} |
위 코드는 1부터 10까지 각 요소를 sum을 하는 어려운 코드로 보일수 있으나
Stream을 설명을 할수 있는 가장 심플한 코드입니다.
AkkaStream은 코드만 보면 가독성이 떨어지는것처럼보이고 난해할수 있습니다.
그래서 그래프를 그려 가독률을 높이는 연습이 필요합니다. 위 코드를 그래프와 연관시켜 보겠습니다.
Stream Graph
Stream연산을 만드는것은 마치 블록을 조합하는것과 유사합니다.
- Source : 하나의 출력을 갖는 처리단계, 수신할 준비가 될때마다 데이터 요소를 방출합니다.
- Sink : 하나의 입력을 가진 처리단계, 요청된 데이터요소를 받아들입니다.
- RunnableGraph : 양쪽 끝 소스와 싱크가 연결되어 실행준비가된 요소이며, 실행후 소비가 가능해집니다.
Flow가 복잡해지더라도, 코드 작성 복잡성을 최대한 줄일수 있는것이 Akka Stream의 특징이며
점점 복잡해지고 쓸만한 Flow를 가지는 Stream처리기를 작성해보도록 하겠습니다.