Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Sink : 연산된 결과를 전달받을 대상자(소비자)
  • 변환 : 연산 결과를 Sink대상이 이해할수 있는 데이터로 변환하는 과정

확장2-

...

Reusable Pices

Code Block
languagec#
themeEmacs
using Akka.Streams.IO;


public static Sink<string, Task<IOResult>> LineSink(string filename)
{
    return Flow.Create<string>()
      .Select(s => ByteString.FromString($"{s}\n"))
      .ToMaterialized(FileIO.ToFile(new FileInfo(filename)), Keep.Right);
}

public void StreamTest2()
{
    actorSystem = ActorSystem.Create("ServiceB");
    Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100));
    using (var materializer = actorSystem.Materializer())
    {
        source.RunForeach(i => Console.WriteLine(i.ToString()), materializer);
        
        var factorials = source.Scan(1, (acc, next) => acc + next);
        factorials.Select(_ => _.ToString()).RunWith(LineSink("factorials.txt"), materializer);
    }
}

...

Akka 사전

  • Flow : 데이터의 흐름을 정의


확장3-TimeBased Processing

Code Block
languagec#
themeEmacs
actorSystem = ActorSystem.Create("ServiceB");
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100));
using (var materializer = actorSystem.Materializer())
{                
    var factorials = source.Scan(1, (acc, next) => acc + next);                
    factorials
         //.ZipWith(Source.From(Enumerable.Range(0, 100)), (num, idx) => $"{idx}! = {num}")
         .Throttle(1, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping)
         .RunForeach(i => Console.WriteLine(i.ToString()), materializer).Wait();                
}

스트림을 특정속도로 흐를수 있도록 제한을 할수가 있습니다.

스로틀(throttle) 연결자를 사용하여 초당 1요소만 처리할수 있도록 설정을 하였으며

실제 콘솔로그를 통해 초당 1개씩만 처리하는 결과를 볼수가 있습니다.

이러한 기능의 중요성은 전통적인 개발방법에서 사용되는 속도제한을 위해 루프문안에서 sleep(블락킹)처리를

할필요가 없다란것이며 , 그러한 처리가 사실은 스레드 효율을 떨어트리고 부적절하다란것입니다.


여기서 설명된것은 AkkaStream의 기본이며 , 우리는 다양한 소스및 싱크 또한 선택할수 있는 

많은 스트림 변환조합이 있다란 것을 조금식 살펴보면되겠습니다.