Page History
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_2.12</artifactId> <version>${akka.version}</version> </dependency> |
Stream Basic1 - Sum(1~10)
Code Block | ||||
---|---|---|---|---|
| ||||
package com.psmon.cachedb.actortest; import org.springframework.stereotype.Component; import akka.*; import akka.stream.*; import akka.stream.javadsl.*; import java.util.Arrays; import java.util.concurrent.CompletionStage; /* ActorStream class */ @Component public class ActorStream extends ActorBase{ public void runAll() { runBasic1(); } protected void runBasic1() { // Stream을 실행시킬수 있는곳 ( 액터시스템이 가지고 있음 ) final Materializer materializer = ActorMaterializer.create(system); final Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); // note that the Future is scala.concurrent.Future final Sink<Integer, CompletionStage<Integer>> sink = Sink.<Integer, Integer> fold(0, (aggr, next) -> aggr + next); // connect the Source to the Sink, obtaining a RunnableFlow final RunnableGraph<CompletionStage<Integer>> runnable = source.toMat(sink, Keep.right()); //Left:읽은쪽을 유지 , Right:기록한쪽을 유지 . Both:둘다 유지 , None : 유지없음 // materialize the flow final CompletionStage<Integer> sum = runnable.run(materializer); try { log.info("==> {}",sum.toCompletableFuture().get()); } catch (Exception e) { //TODO: handle exception } } } |
...
복잡성이 비대하게 증가하지 않는다란점을 샘플코드로 확인및 증명 예정입니다.
Stream Graph
Stream연산을 만드는것은 마치 블록을 조합하는것과 유사합니다.
...
점점 복잡해지고 쓸만한 Flow를 가지는 Stream처리기를 작성해보도록 하겠습니다.
Stream Basic2 - 비동기/동기 처리를 혼합하기
비동기적으로( 동시에 여러개 처리가됩니다.) 처리할수 있는 영역 ( 소스+1 연산) 과
...
물론 이것은 언어적 요소가 항상 해결해주는것이 아닌, 적절한 툴(API)을 활용해야합니다.
Stream Basic 3 - Flow를 분기하고 합하기
칠판에 그려놓은, 스트림처리 Flow(Graph)를 코드로 이질감없이 옮길수 있는것이(혹은반대) Akka Stream 의 목표이며
...
- 녹색: 왼쪽에서 오른쪽으로 흐르는 하나의 흐름(혹은 위에서 아래로) - Via
- 노랑: 분기가 갈라지거나 합해지는 구간( bcast : out이 2개 , merger : in 이 2개 ) - Merge
- 레드: 분기 처리 - Bcast
구현샘플
Flow를 각각 구현하고 이것을 다양한 방법으로 결합을 할수가 있습니다.
...