You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

흐름정의를 통한 병렬 스트림처리

다음을 구현해야된다고 해봅시다.

  • 원본 소스는 1부터 100까지 입니다.
  • 1차 연산은  x+1을 하며
  • 2차 연산은 1차연산 결과값 * 2 을 합니다. 
  • 최종 연산값은 콘솔에 표시가됩니다. (Sink)

전제조건으로 덧셈연산과 곱하기 연산이 아주 비싸다고 가정을 하고

연산의 효율을 위해  각각의 연산을 비동기적으로 병렬처리로 분리를 하는것을 구현해봅시다.


위 흐름과 같이 , 비동기적으로 처리할수 있는 앞단부터 수행을 하고 이것이 완료되는 대로 뒷단 부분과

연결이되어 계산을 수행하고 그 결과를 어떠한 목적지 (Sink)로 정확한 실행 순서(Source)로 흘려 보내는것입니다.

스레드 모델을  직접 사용하여 구현시 이 구현체는 어려울것입니다.  특히나 비동적으로 수행된 결과의

순서를 맞추는것은 더욱더 복잡한 일이 될것입니다. 


AkkaStream에서는 다음과 같은 방법으로 단순화할수가 있습니다.

var actorSystem = ActorSystem.Create("ServiceB");
var materializer = actorSystem.Materializer();

var runnablegraph= Source.From(Enumerable.Range(1, 100))
    .Select(x => x + 1)
    .Async()  //이 키워드로 비동기 처리영역을 구분합니다.
    .Select(x => x * 2)
    .To(Sink.ForEach<int>(x => Console.WriteLine(x.ToString())));

runnablegraph.Run(materializer);


실제로 단순한 덧셈/곱셉 처리는 아주 빠르기때문에 이렇게 나누는게 효율적이지 않을것이나

이것은 AKKA의 Stream처리가  단순하고 강력하다는것을 보여주는 한가지예입니다.

C#의 PLINQ(https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/introduction-to-plinq) 와 사용패턴과 목적이 유사합니다.

네트워크가 없는 단일 컴퓨터에서는 동일한 목표를 가지고 있지만, AkkaStream은 분산 컴퓨터 처리로 확장이 됩니다.

이것은 AkkaStream의 기능을 더 살펴보고 후반부에서 설명을 진행하도록 하겠습니다. 


관련 용어 정리:

  • Source : 하나의 출력을 갖는 처리단계, 수신할 준비가 될때마다 데이터 요소를 방출합니다.
  • Sink : 하나의 입력을 가진 처리단계, 요청된 데이터요소를 받아들입니다.
  • Flow : 입출력을 가지는 처리단계의 흐름입니다.  ( Select → Async → Select)
  • RunnableGraph : 양쪽 끝이 소스및 싱크에 연결되어 실행준비가 된 흐름입니다.

작동방식 도식화

스트림처리가 다소 초기개념을 잡기 어려운점이 있지만 

AKKA의 스트림은  배관공 게임처럼 이러한 블럭을 조립을 할수있게합니다.


입출력 양끝단을 정의하며 ( Source - Sink ), 중간에 흐름을 변화시키는 (Flow)를

정의하는것입니다. 데이터의 흐름이 시작하여 종착지까지 도착하는것을 정의하는

최소단위가 RunnableGraph 입니다.


스트림처리의 확장

스트림처리는 AKKA의 주제에서도 광범위하고 어렵습니다.

배관공 게임도 분명 끝판 스테이지는 어렵듯이

우리는 위와같이 복잡한 스트림처리를 단순화하기위해 스트림의 결합요소에 대해 더 연구해보겠습니다.


다양한 소스/싱크처리

// Create a source from an Iterable
Source.From(new List<int> {1, 2, 3});

// Create a source from a Task
Source.FromTask(Task.FromResult("Hello Streams!"));

// Create a source from a single element
Source.Single("only one element")

// an empty source
Source.Empty<int>();

// Sink that aggregates over the stream and returns a Task
// of the final result as its materialized value
Sink.Aggregate<int, int>(0, (sum, i) => sum + i);

// Sink that returns a Task as its materialized value,
// containing the first element of the stream
Sink.First<int>();

// A Sink that consumes a stream without doing anything with the elements
Sink.Ignore<int>();

// A Sink that executes a side-effecting call for every element of the stream
Sink.ForEach<string>(Console.WriteLine);


흐름처리를 조금더 명시화한 스트림처리

actorSystem = ActorSystem.Create("ServiceB");
materializer = actorSystem.Materializer();

var flow = Flow.Create<int>().Select(x => x * 2);
var flow2 = Flow.Create<int>().Select(x => x + 3);
var runnablegraph = Source.From(Enumerable.Range(1, 10))
    .Via(flow)  //Via는 Source와 Flow를 결합하여 Source로 만듭니다. 
    .Via(flow2) //Via의 결과값이 Source이기때문에 다시 Flow와 조합이 가능
    .Where(x => x > 10 ) //Filter조건으로 10이하의 값은 버린다.
    .To( Sink.ForEach<int>(x => Console.WriteLine(x.ToString())) );

runnablegraph.Run(materializer);





  • No labels