Page History
기본
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
using Akka; using Akka.Streams; using Akka.Streams.Dsl; actorSystem = ActorSystem.Create("ServiceB"); Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100)); using (var materializer = actorSystem.Materializer()) { source.RunForeach(i => Console.WriteLine(i.ToString()), materializer); Console.WriteLine("Loop가 끝나지 않아도 동시에 실행됨"); } |
...
AKKA Stream을 시작하기위해서는 위 코드의 차이를 이해하는것에부터 출발합니다.
우선 AkkaStream 코드에서는 큰틀 데이터의 큰틀에서 소스와 처리기를 분리하였습니다분리 하였습니다.
우선 데이터의 소스는 단순하게 실행을 하려는 설명 청사진에 해당하며 이것은 재활용가능하거나
...
Loop의 범위가 int.Max라고 가정해봅시다. Loop가 끝날때까지 시간이 꾀걸릴것이며, 다음 진행 코드가 실행될수
없을 것입니다.위 샘플에서 언급된 단어를 다음과 같이 정의 하겠습니다.
AKKA 사전
- Source : 출처가되는 데이터 소스,청사진 (원소 생산자)
- Materrializer : 각 요소를 실행할 엔진 (처리 노드)
확장
Code Block | ||||
---|---|---|---|---|
| ||||
using System.IO;
using Akka.IO;
actorSystem = ActorSystem.Create("ServiceB");
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100));
using (var materializer = actorSystem.Materializer())
{
source.RunForeach(i => Console.WriteLine(i.ToString()), materializer);
var factorials = source.Scan( 1 , (acc, next) => acc + next);
var result =
factorials
.Select(num => ByteString.FromString($"{num}\n"))
.RunWith(FileIO.ToFile(new FileInfo("factorials.txt")), materializer);
} |
기본 스트림처리에서 어떻게 확장되는지 살펴 보겠습니다.
우선 소스인 1-100까지는 변함이 없습니다.
- Scan기능을 통해 전체 스트림에 대해 계산을 실행합니다. ( 1~100까지의 각각의 전체합을 방출 )
- Select에서는 계산된 결과를 어떻게 변환할지에대해 정의를 합니다.
- RunWith에서는 파일을 데이터 수신자로 정의하고(Sink라고 합니다.) , 실행엔진을 선택합니다.
- RunWith는 IOResult를 반환하고 IO작업이 얼만큼/정상적으로 처리가 되었는지를 알려줍니다.
Akka 사전
- Sink : 연산된 결과를 전달받을 대상자(소비자)
- 변환 : 연산 결과를 Sink대상이 이해할수 있는 데이터로 변환하는 과정
확장2-Reusable Pices
Code Block | ||||
---|---|---|---|---|
| ||||
using Akka.Streams.IO;
public static Sink<string, Task<IOResult>> LineSink(string filename)
{
return Flow.Create<string>()
.Select(s => ByteString.FromString($"{s}\n"))
.ToMaterialized(FileIO.ToFile(new FileInfo(filename)), Keep.Right);
}
public void StreamTest2()
{
actorSystem = ActorSystem.Create("ServiceB");
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100));
using (var materializer = actorSystem.Materializer())
{
source.RunForeach(i => Console.WriteLine(i.ToString()), materializer);
var factorials = source.Scan(1, (acc, next) => acc + next);
factorials.Select(_ => _.ToString()).RunWith(LineSink("factorials.txt"), materializer);
}
} |
확장에서 설명된 코드를 다시 더 작은 처리기로 분리 해보겠습니다.
여기서 Akka Stream의 멋진 부분중 하나는 소스를 재사용할수 있을뿐더러
다른 모든 요소도 교체가 가능하다란 것입니다.
- 파일쓰기 Sink를 지정할수 있으며, 필요하면 다른 Sink처리기 구현을 통해 재사용이 가능합니다.
- 스트림처리 결과는 양방향으로 다시 흘러갈수 있으며, 단방향처리에서는 일반적으로 오르쪽으( Keep.Right) 로만 흐르며 이것을 Flow라고 부릅니다.
Akka 사전
- Flow : 데이터의 흐름을 정의
확장3-TimeBased Processing
Code Block | ||||
---|---|---|---|---|
| ||||
actorSystem = ActorSystem.Create("ServiceB");
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100));
using (var materializer = actorSystem.Materializer())
{
var factorials = source.Scan(1, (acc, next) => acc + next);
factorials
//.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}")
.Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
.RunForeach(i => Console.WriteLine(i.ToString()), materializer).Wait();
} |
스트림을 특정속도로 흐를수 있도록 제한을 할수가 있습니다.
스로틀(throttle) 연결자를 사용하여 초당 1요소만 처리할수 있도록 설정을 하였으며
실제 콘솔로그를 통해 초당 1개씩만 처리하는 결과를 볼수가 있습니다.
이러한 기능의 중요성은 전통적인 개발방법에서 사용되는 속도제한을 위해 루프문안에서 sleep(블락킹)처리를
할필요가 없다란것이며 , 그러한 처리가 사실은 스레드 효율을 떨어트리고 부적절하다란것입니다.
여기서 설명된것은 AkkaStream의 기본이며 , 우리는 다양한 소스및 싱크 또한 선택할수 있는
많은 스트림 변환조합이 있다란 것이며
다음장에서는 스트림처리에 필요한 RunnableGraph를 통해 조금더 단순화해보겠습니다.