기본
using Akka; using Akka.Streams; using Akka.Streams.Dsl; actorSystem = ActorSystem.Create("ServiceB"); Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100)); using (var materializer = actorSystem.Materializer()) { source.RunForeach(i => Console.WriteLine(i.ToString()), materializer); Console.WriteLine("Loop가 끝나지 않아도 동시에 실행됨"); }
위 실행결과는 1부터 100까지 숫자를 단순하게 콘솔로 찍어주는것입니다.
for(int srcNum = 0; srcNum < 100; srcNum++) Console.WriteLine(srcNum.ToString()); Console.WriteLine("Loop가 끝나야 실행됨");
수행 결과만 보면 전통적인 개발방법에서 사용한 결과랑 다를바가 없습니다.
단순하게 코드량만 비교하면 AkkaStream을 사용안하는게 훨씬더 직관적입니다.
하지만 왜 이것을 사용해야하는것일까요?
AKKA Stream을 시작하기위해서는 위 코드의 차이를 이해하는것에부터 출발합니다.
우선 AkkaStream 코드에서는 큰틀에서 소스와 처리기를 분리 하였습니다.
우선 데이터의 소스는 단순하게 실행을 하려는 설명 청사진에 해당하며 이것은 재활용가능하거나
변환이 가능합니다. 그리고 실행을 분리함으로 계산 처리를 블록킹 없이 다음 처리가 가능합니다.
Loop의 범위가 int.Max라고 가정해봅시다. Loop가 끝날때까지 시간이 꾀걸릴것이며, 다음 진행 코드가 실행될수
없을 것입니다.
AKKA 사전
- Source : 출처가되는 데이터 소스,청사진
- Materrializer : 각 요소를 실행할 엔진
확장
using System.IO; using Akka.IO; actorSystem = ActorSystem.Create("ServiceB"); Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100)); using (var materializer = actorSystem.Materializer()) { source.RunForeach(i => Console.WriteLine(i.ToString()), materializer); var factorials = source.Scan( 1 , (acc, next) => acc + next); var result = factorials .Select(num => ByteString.FromString($"{num}\n")) .RunWith(FileIO.ToFile(new FileInfo("factorials.txt")), materializer); }
기본 스트림처리에서 어떻게 확장되는지 살펴 보겠습니다.
우선 소스인 1-100까지는 변함이 없습니다.
- Scan기능을 통해 전체 스트림에 대해 계산을 실행합니다. ( 1~100까지의 각각의 전체합을 방출 )
- Select에서는 계산된 결과를 어떻게 변환할지에대해 정의를 합니다.
- RunWith에서는 파일을 데이터 수신자로 정의하고(Sink라고 합니다.) , 실행엔진을 선택합니다.
- RunWith는 IOResult를 반환하고 IO작업이 얼만큼/정상적으로 처리가 되었는지를 알려줍니다.
Akka 사전
- Sink : 연산된 결과를 전달받을 대상자
- 변환 : 연산 결과를 Sink대상이 이해할수 있는 데이터로 변환하는 과정
확장2-Resable Pices
using Akka.Streams.IO; public static Sink<string, Task<IOResult>> LineSink(string filename) { return Flow.Create<string>() .Select(s => ByteString.FromString($"{s}\n")) .ToMaterialized(FileIO.ToFile(new FileInfo(filename)), Keep.Right); } public void StreamTest2() { actorSystem = ActorSystem.Create("ServiceB"); Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100)); using (var materializer = actorSystem.Materializer()) { source.RunForeach(i => Console.WriteLine(i.ToString()), materializer); var factorials = source.Scan(1, (acc, next) => acc + next); factorials.Select(_ => _.ToString()).RunWith(LineSink("factorials.txt"), materializer); } }
확장에서 설명된 코드를 다시 더 작은 처리기로 분리 해보겠습니다.
여기서 Akka Stream의 멋진 부분중 하나는 소스를 재사용할수 있을뿐더러
다른 모든 요소도 교체가 가능하다란 것입니다.
- 파일쓰기 Sink를 지정할수 있으며, 필요하면 다른 Sink처리기 구현을 통해 재사용이 가능합니다.
- 스트림은 대부분 왼쪽에서 오른쪽( Keep.Right) 로 흐르며 Akka스트림에서 Flow라고 부릅니다.
Akka 사전
- Flow : 데이터의 흐름을 정의