Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

기본

Code Block
languagec#
themeEmacs
titleStream처리
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;


actorSystem = ActorSystem.Create("ServiceB");
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100));
using (var materializer = actorSystem.Materializer())
{
    source.RunForeach(i => Console.WriteLine(i.ToString()), materializer);
    Console.WriteLine("Loop가 끝나지 않아도 동시에 실행됨");
}

...

AKKA Stream을 시작하기위해서는 위 코드의  차이를 이해하는것에부터 출발합니다.

우선 AkkaStream 코드에서는 큰틀  데이터의 큰틀에서  소스와 처리기를 분리하였습니다분리 하였습니다.

우선 데이터의 소스는 단순하게 실행을 하려는 설명 청사진에 해당하며 이것은 재활용가능하거나

...

Loop의 범위가 int.Max라고 가정해봅시다. Loop가 끝날때까지 시간이 꾀걸릴것이며, 다음 진행 코드가 실행될수

없을 것입니다.위 샘플에서 언급된 단어를 다음과 같이 정의 하겠습니다.


AKKA 사전

  • Source : 출처가되는 데이터 소스,청사진
  • Materrializer : 각 요소를 실행할  엔진


확장

Code Block
languagec#
themeEmacs
using System.IO;
using Akka.IO;


actorSystem = ActorSystem.Create("ServiceB");
Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100));
using (var materializer = actorSystem.Materializer())
{
    source.RunForeach(i => Console.WriteLine(i.ToString()), materializer);

    var factorials = source.Scan( 1 , (acc, next) => acc + next);
    var result =
        factorials
            .Select(num => ByteString.FromString($"{num}\n"))
            .RunWith(FileIO.ToFile(new FileInfo("factorials.txt")), materializer);
}


기본 스트림처리에서 어떻게 확장되는지 살펴 보겠습니다.

우선 소스인 1-100까지는 변함이 없습니다. 

  • Scan기능을 통해 전체 스트림에 대해 계산을 실행합니다. ( 1~100까지의 각각의 전체합을 방출 )
  • Select에서는 계산된 결과를 어떻게 변환할지에대해 정의를 합니다.
  • RunWith에서는 파일을 데이터 수신자로 정의하고(Sink라고 합니다.) , 실행엔진을 선택합니다.
  • RunWith는 IOResult를 반환하고 IO작업이 얼만큼/정상적으로 처리가 되었는지를 알려줍니다.

Akka 사전

  • Sink : 연산된 결과를 전달받을 대상자
  • 변환 : 연산 결과를 Sink대상이 이해할수 있는 데이터로 변환하는 과정

확장2-Resable Pices

Code Block
languagec#
themeEmacs
using Akka.Streams.IO;


public static Sink<string, Task<IOResult>> LineSink(string filename)
{
    return Flow.Create<string>()
      .Select(s => ByteString.FromString($"{s}\n"))
      .ToMaterialized(FileIO.ToFile(new FileInfo(filename)), Keep.Right);
}

public void StreamTest2()
{
    actorSystem = ActorSystem.Create("ServiceB");
    Source<int, NotUsed> source = Source.From(Enumerable.Range(1, 100));
    using (var materializer = actorSystem.Materializer())
    {
        source.RunForeach(i => Console.WriteLine(i.ToString()), materializer);
        
        var factorials = source.Scan(1, (acc, next) => acc + next);
        factorials.Select(_ => _.ToString()).RunWith(LineSink("factorials.txt"), materializer);
    }
}

확장에서 설명된 코드를 다시 더 작은 처리기로 분리 해보겠습니다.

여기서 Akka Stream의 멋진 부분중 하나는 소스를 재사용할수 있을뿐더러

다른 모든 요소도 교체가 가능하다란 것입니다.

  • 파일쓰기 Sink를 지정할수 있으며, 필요하면 다른 Sink처리기 구현을 통해 재사용이 가능합니다.
  • 스트림은 대부분 왼쪽에서 오른쪽( Keep.Right) 로 흐르며 Akka스트림에서 Flow라고 부릅니다.


Akka 사전

  • Flow : 데이터의 흐름을 정의