스트림처리는 AKKA의 주제에서도 광범위하고 어렵습니다.
배관공 게임도 분명 끝판 스테이지는 어렵듯이 우리는 위와같이 복잡한 스트림처리를 단순화하기위해
그래프 DSL을 이용해보겠습니다.
여러개의 입출력이 필요한 복잡한 스트림 처리를 위해서,
AkkaStream은 그래프 DSL을 사용하여 여러개인 팬인 또는 팬아웃시나리오를
기술할수 있게 해줍니다.
이것은 마치 화이트보드에 그린 프로토콜사양의 그림을
AkkaStream이 제공하는 GraphDSL객체를 활용하여 구성하고 재활용할수 있게합니다.
그리고 그것은 그래프가 의도한대로 실제로 동작을 합니다.
앞으로 이러한 전체적인 아카내 그래프를 사용한 스트리밍 흐름처리에 대해 그래프라고 단순하게 정의하겠습니다.
간단한 그래프를 구현해보기
그래프 설명
- in : 입력은 1~10까지입니다.
- f1 ~f4 : 각 팬을 지날떄마다 +10씩 더합니다.
- bcast : 분기가 일어나는 구간이며, 요소가 분기수만큼 복제가 됩니다.
- merge : 분기된 요소가 다시 집합하는 구간입니다.
코드구현
actorSystem = ActorSystem.Create("ServiceB"); materializer = actorSystem.Materializer(); var g = RunnableGraph.FromGraph(GraphDsl.Create(builder => { var source = Source.From(Enumerable.Range(1, 10)); var sink = Sink.Ignore<int>().MapMaterializedValue(_ => NotUsed.Instance); var sinkConsole = Sink.ForEach<int>(x=>Console.WriteLine(x.ToString())) .MapMaterializedValue(_ => NotUsed.Instance); var broadcast = builder.Add(new Broadcast<int>(2)); var merge = builder.Add(new Merge<int>(2)); var f1 = Flow.Create<int>().Select(x => x + 10); var f2 = Flow.Create<int>().Select(x => x + 10); var f3 = Flow.Create<int>().Select(x => x + 10); var f4 = Flow.Create<int>().Select(x => x + 10); builder.From(source).Via(f1).Via(broadcast).Via(f2).Via(merge).Via(f3).To(sinkConsole); builder.From(broadcast).Via(f4).To(merge); return ClosedShape.Instance; })); g.Run(materializer);
- 녹색: 왼쪽에서 오른쪽으로 흐르는 하나의 흐름(혹은 위에서 아래로)
- 노랑: 분기가 갈라지거나 합해지는 구간( bcast : out이 2개 , merger : in 이 2개 )
- 레드: 분기 처리
스트림처리가 Source에서 발생을 하여, 분기가 발생한후 merge지점에서 다시 같이 흐르기 시작하여
결과처리가 됩니다. 약간 익숙하지 않을수 있지만, 그래프의 요소와 작성되는 코드의 요소가 정확하게
일치하며 실제로 그래프와 동일하게 작동됩니다.
Fan-in/out 처리
Broadcast는 양방향으로 분기하고, Merge는 분기된 흐름을 다시 합할수 있는것을 보았습니다.
이것은 그래프의 요소인 Fan의 역활이며 Flow의 접함점에대한 처리를 할수가 있습니다.
아래와같이 다양한 Fan처리가 가능합니다.
Fan-out
Broadcast<T>
– (1 input, N outputs) given an input element emits to each outputBalance<T>
– (1 input, N outputs) given an input element emits to one of its output portsUnzipWith<In,A,B,...>
– (1 input, N outputs) takes a function of 1 input that given a value for each input emits N output elements (where N <= 20)UnZip<A,B>
– (1 input, 2 outputs) splits a stream of(A,B)
tuples into two streams, one of typeA
and one of typeB
Fan-in
Merge<In>
– (N inputs , 1 output) picks randomly from inputs pushing them one by one to its outputMergePreferred<In>
– likeMerge
but if elements are available onpreferred
port, it picks from it, otherwise randomly fromothers
MergePrioritized<In>
– likeMerge
but if elements are available on all input ports, it picks from them randomly based on theirpriority
ZipWith<A,B,...,Out>
– (N inputs, 1 output) which takes a function of N inputs that given a value for each input emits 1 output elementZip<A,B>
– (2 inputs, 1 output) is aZipWith
specialised to zipping input streams ofA
andB
into an(A,B)
tuple streamConcat<A>
– (2 inputs, 1 output) concatenates two streams (first consume one, then the second one)
그래프를 부분적으로 생성하고 합하기
더 복잡한 스트리밍처리를위해서는 ( 실제로도 스트리밍은 복잡하게 사용될수 있습니다.)
한번에 정의할수도 있지만, 작은단위로 그래프를 생성하고 합할수가있으며 재사용될수 있습니다.