Page History
...
- Source : 출처가되는 데이터 소스,청사진 (원소 생산자)
- Materrializer : 각 요소를 실행할 엔진 (처리 노드)
확장
Code Block | ||||
---|---|---|---|---|
| ||||
using System.IO; using Akka.IO; actorSystem = ActorSystem.Create("ServiceB"); Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100)); using (var materializer = actorSystem.Materializer()) { source.RunForeach(i => Console.WriteLine(i.ToString()), materializer); var factorials = source.Scan( 1 , (acc, next) => acc + next); var result = factorials .Select(num => ByteString.FromString($"{num}\n")) .RunWith(FileIO.ToFile(new FileInfo("factorials.txt")), materializer); } |
...
Akka 사전
- Sink : 연산된 결과를 전달받을 대상자(소비자)
- 변환 : 연산 결과를 Sink대상이 이해할수 있는 데이터로 변환하는 과정
확장2-
...
Reusable Pices
Code Block | ||||
---|---|---|---|---|
| ||||
using Akka.Streams.IO; public static Sink<string, Task<IOResult>> LineSink(string filename) { return Flow.Create<string>() .Select(s => ByteString.FromString($"{s}\n")) .ToMaterialized(FileIO.ToFile(new FileInfo(filename)), Keep.Right); } public void StreamTest2() { actorSystem = ActorSystem.Create("ServiceB"); Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100)); using (var materializer = actorSystem.Materializer()) { source.RunForeach(i => Console.WriteLine(i.ToString()), materializer); var factorials = source.Scan(1, (acc, next) => acc + next); factorials.Select(_ => _.ToString()).RunWith(LineSink("factorials.txt"), materializer); } } |
...
- 파일쓰기 Sink를 지정할수 있으며, 필요하면 다른 Sink처리기 구현을 통해 재사용이 가능합니다.
- 스트림은 대부분 왼쪽에서 오른쪽스트림처리 결과는 양방향으로 다시 흘러갈수 있으며, 단방향처리에서는 일반적으로 오르쪽으( Keep.Right) 로 흐르며 Akka스트림에서 로만 흐르며 이것을 Flow라고 부릅니다.
Akka 사전
- Flow : 데이터의 흐름을 정의
확장3-TimeBased Processing
Code Block | ||||
---|---|---|---|---|
| ||||
actorSystem = ActorSystem.Create("ServiceB");
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100));
using (var materializer = actorSystem.Materializer())
{
var factorials = source.Scan(1, (acc, next) => acc + next);
factorials
//.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}")
.Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
.RunForeach(i => Console.WriteLine(i.ToString()), materializer).Wait();
} |
스트림을 특정속도로 흐를수 있도록 제한을 할수가 있습니다.
스로틀(throttle) 연결자를 사용하여 초당 1요소만 처리할수 있도록 설정을 하였으며
실제 콘솔로그를 통해 초당 1개씩만 처리하는 결과를 볼수가 있습니다.
이러한 기능의 중요성은 전통적인 개발방법에서 사용되는 속도제한을 위해 루프문안에서 sleep(블락킹)처리를
할필요가 없다란것이며 , 그러한 처리가 사실은 스레드 효율을 떨어트리고 부적절하다란것입니다.
여기서 설명된것은 AkkaStream의 기본이며 , 우리는 다양한 소스및 싱크 또한 선택할수 있는
많은 스트림 변환조합이 있다란 것이며
다음장에서는 스트림처리에 필요한 RunnableGraph를 통해 조금더 단순화해보겠습니다.