Page History
...
- 녹색: 왼쪽에서 오른쪽으로 흐르는 하나의 흐름(혹은 위에서 아래로) - Via
- 노랑: 분기가 갈라지거나 합해지는 구간( bcast : out이 2개 , merger : in 이 2개 ) - Merge
- 레드: 분기 처리 - Bcast
구현샘플
Flow를 각각 구현하고 이것을 다양한 방법으로 결합을 할수가 있습니다.
다음장에서 각각의 구성요소 상세하게 설명을하고, 결합을 하는 방법에대해 알아보겠습니다.
Code Block | ||||
---|---|---|---|---|
| ||||
protected void runBasic3(){
final Materializer materializer = ActorMaterializer.create(system);
final Source<Integer, NotUsed> in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
final Sink<List<String>, CompletionStage<List<String>>> sink = Sink.head();
final Flow<Integer, Integer, NotUsed> f1 = Flow.of(Integer.class).map(elem -> elem + 10);
final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).map(elem -> elem + 20);
final Flow<Integer, String, NotUsed> f3 = Flow.of(Integer.class).map(elem -> elem.toString());
final Flow<Integer, Integer, NotUsed> f4 = Flow.of(Integer.class).map(elem -> elem + 30);
final RunnableGraph<CompletionStage<List<String>>> result =
RunnableGraph.fromGraph(
GraphDSL // create() function binds sink, out which is sink's out port and builder DSL
.create( // we need to reference out's shape in the builder DSL below (in to() function)
sink, // previously created sink (Sink)
(builder, out) -> { // variables: builder (GraphDSL.Builder) and out (SinkShape)
final UniformFanOutShape<Integer, Integer> bcast = builder.add(Broadcast.create(2));
final UniformFanInShape<Integer, Integer> merge = builder.add(Merge.create(2));
final Outlet<Integer> source = builder.add(in).out();
builder.from(source).via(builder.add(f1))
.viaFanOut(bcast).via(builder.add(f2)).viaFanIn(merge)
.via(builder.add(f3.grouped(1000))).to(out); // to() expects a SinkShape
builder.from(bcast).via(builder.add(f4)).toFanIn(merge);
return ClosedShape.getInstance();
}));
result.run(materializer);
} |
참고 URL : https://doc.akka.io/docs/akka/current/stream/stream-graphs.html
...