Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Info

Future(미래)와 PromicePromise(약속)은 JAVA 비동기처리에 있어서 중요한 개념입니다.

미래에는 블록킹이 없기때문에 어느시점 사용가능 해지는 사용가능한 함수의 결과(성공또는 실패등을)

를 담을수 있는 용기라고 보면 됩니다.

Scala에서는 이러한 기능을 위해 동일한 키워드를 지원하며 Scala에서 지원하는 키워드이지만 일반적으로 결과에대한

비동기적인 핸들이라고 이해하시면 보면 되겠으며 C#/JAVA에서도 유사한 모델을 가지고 있습니다.


이것은 AKKA의 요소가아닌 기존 개발 플랫폼의 비동기처리 요소로

나중에 Actor, Akka Stream등과 연동되는 중요한 요소로 사용이 될수 있습니다.


관련 참고 원문: 이러한 컨셉은 Java(8)/.net(4.5)의 경우 최신 버젼에서 지원하는것으로 보아 scala에서 영향을 준것으로 보입니다에서도 지원을 합니다.


Warning

자바7의 java.util.concurrent.Future에 익숙하다면 scala.concurrent.Future가 자바 클래스를 감싼것으로 생각할수도 있지만

실제로는 그렇지 않다. java.util.concurrent.Future 클래스는 폴링을 필요로하며 결과를 얻기위해 블로킹 get 메서드를 사용해야만

한다. 하지만 스칼라의 퓨쳐는 블로킹이나 풀링을 사용하지 않고 함수결과를 조합할수 있으며

JAVA8의 CompletetableFuture<T>가 오히려 여기서 설명하는 퓨쳐와 유사하다.

Note

akka.net을 활용할 경우 asyn/await를 활용할수 있습니다. 


Info

Erik Meijer의 한마디~

"이보게, 브라이언 괴츠, C#,파이선,자바스크립트는 물론 심지어 PHP도 async, await를 지원하고 있다네. 그런 기능이 없는 언어는 자바일뿐이야.

람다를 이용해서 콜백함수를 사용하면 된다고? 천만에 콜백은 최악이야. 도움이 안된다고. 자바 9 버전에 담으려고 하는 걸 다 내려놓고 지금당장

asymc, await부터 넣으라고. 그래야 모두가 행복해질수 있어"


java9에도 async/await 가 결국 적용안되었다는 풍문...

비동기처리 언어 스펙을 사용해야하는 akka에서도 ask패턴의 사용방법이 나뉘게 됩니다.

c#java
Code Block
languagec#
themeEmacs
var result = await actor1.Ask("hello");



Code Block
languagejava
themeEmacs
final FiniteDuration duration = Duration.create(20, TimeUnit.SECONDS); 
Future<Object> future = Patterns.ask(asyncWorker,
  RequestWorkerMsgType.CHECK_FUTURE_STATE, new Timeout(duration)); 
ResponseOnSingeRequest response = (ResponseOnSingeRequest)Await.result(future, duration);


async/await에 비헤 future는 뭔가 불합리하게 많은 코드를 작성하는것으로 보인다.

Future를 사용한 비동기처리 목표

Future와 Promice를 사용하는 목적은 비동기처리를 콜백헬에서 벗어나 결함을 통하여, 여러절차가 포함된 복잡한 비동기처리를

콜백헬을 방지하고, 조금더 도식화 할수있는 방법을 제공해줍니다구조화할수 있게 도와줍니다.

콜백헬 예 : http://callbackhell.com/ 

Future

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNameasync-future
simpleViewerfalse
width
diagramWidth581
revision2

...

Info

퓨쳐는 여러함수의 결과가 다음 함수의 입력이되어야할시, 여러 함수가 병렬로 연결되며

나중에 여러 함수의 결과를 조합해야하는 파이프라이닝(pipelining)에 좋은 도구입니다.


Future

...

Sample-Scala

draw.io Diagram
bordertrue
viewerToolbartrue
fitWindowfalse
diagramNametrafficService
simpleViewerfalse
width
diagramWidth691
revision1

...

Code Block
languagescala
themeEmacs
titleFuture를 사용
var request = EventRequest(ticketNr)    //요청을만든다
val futureRoute : Future[Route] = Future{
  callEventService(request).event
}.map{ event =>
  val trafficRequest = TrafficRequest{
    destination = event.location,
    arrivalTime = event.time
  )
  callTrafficeService(trafficRequest).route  //교통 루트를 반환한다.
}


Future Sample

Ff963556.dda12864-f9bd-4340-831f-212c4ee8bc4a(en-us,PandP.10).pngImage Added

Code Block
languagejava
themeEmacs
titleJAVA
import java.util.concurrent.CompletableFuture;
 
 
//선언부
@FunctionalInterface
interface FuncB {
    public int calc(int a, int b);
}
  
@FunctionalInterface
interface FuncA {
    public int calc(int a);
}
 
//사용부
FuncA F1 = (int a) -> a+1;
FuncA F2 = (int a) -> a+1;
FuncA F3 = (int a) -> a+1;
FuncB F4 = (int a,int b) -> a+b;
              
Integer input=1;
CompletableFuture<Integer> futureB
        = CompletableFuture.supplyAsync(() -> F1.calc(1) );
 
CompletableFuture<Integer> futureD
        = CompletableFuture.supplyAsync(() -> F3.calc(F2.calc(1))  );
 
CompletableFuture<Void> combinedFuture
        = CompletableFuture.allOf(futureB, futureD)
        .thenAccept( r-> {
            // 최종 연산로직을 비동기로 처리한 케이스
            Integer value1 = futureB.join();
            Integer value2 = futureD.join();
            log( String.format("async result:%d", value1+value2 ));
        }

...

);
combinedFuture.join(); //연산을 비동기적으로 시작하려면 join명령을 사용합니다.
// 최종 연산로직을 동기처리로 변환한 예
combinedFuture.get();   //동기처리로 진행하려면 get을 사용합니다.
Integer sresult = futureB.get() + futureD.get();
log( String.format("sync result:%d",sresult) );
Code Block
languagec#
themeEmacs
titleC#
//Labda
Func<int, int> F1 = x => x + 1;
Func<int, int> F2 = x => x + 1;
Func<int, int> F3 = x => x + 1;
Func<int, int, int> F4 = delegate ( int x, int y ){
     return x + y;
};            

//Block with task
Task<int> futureA = Task.Factory.StartNew<int>(() => F1(1));
int c = F2(1);
int d = F3(c);
int f = F4(futureA.Result, d);
Console.WriteLine("ResultA:" + f);

//Continuation Tasks
var futureB = Task.Factory.StartNew<int>(() => F1(1));
var futureD = Task.Factory.StartNew<int>(() => F3(F2(1)));
var futureF = Task.Factory.ContinueWhenAll<int, int>(
                 new[] { futureB, futureD },
                 ( tasks ) => F4(futureB.Result, futureD.Result));

futureF.ContinueWith(( t ) =>
    Console.WriteLine("ResultF:" + t.Result)
);

Console.WriteLine("Code...End");


JAVA와 C#코드가 흡사합니다. 복합된 비동기처리를 위한 방식은 여러가지 방법이 있겠으나

위와같이 람다식을 활용할수도 있습니다.


Promise (미래를 위한 약속)

퓨쳐가 읽기전용 용기라고하면, 퓨쳐자체를 반환하는것을 작성하고 싶을때는 Promice를 사용하면 됩니다.사용합니다.

Future 용기는 java/.net에 대응가능하지만 Promise를 직접적으로 대응하는 키워드는 없는것으로 보입니다.

SCALA코드의 예를 통해 Promise의 기능을 살펴보겠습니다.


Promise는 Promice는 퓨쳐에 비교하여 다음과 같은 특징을 가집니다.

  • Promice는 Promise는 미래를 위한 약속이기때문에 단한번만 완료될수있습니다.(다시 완료시 예외발생)
  • Promice는 Promise는 성공과 실패처리에대해 분리하여 처리할수 있습니다.

Promice 샘플 - KafkaSend

Code Block
languagescala
themeEmacs
def sendTokafka(recode: ProducerRecod): Futrure[RecordMetadata] = {
  val promise : Promise[[RecordMetadata]] = Promise[RocordMetadata]() //RecordMetadata 타잎의 값을 돌려주는 약속을 만든다.
  val future:Future[RecordMetadata] = promise.future  //future에 대한 참조를 가져온다
  
  val callback = new CallBack(){  //송신성공시 카프카 콜백
    def onCompletition(metadata: RecordMetadata, e:Exception): Unit={
      if(e != null ) promise.failure(e)  //오류기록
      else promise.success(metadata)     //성공기록
    }
  }
  producer.send(record,callback)
  future
}

퓨쳐를 조합하기


Anchor
futureactor
futureactor

퓨쳐와 액터 조합하기

.net 에서 Actor 메시지와 Future(Task)를 조합한 샘플

Code Block
languagec#
themeEmacs
public class ReActor : ReceiveActor
{
  private ILoggingAdapter log = Context.GetLogger();

  public ReActor()
  {
      string myPath = Self.Path.ToString();

      Receive<string>(message => {
          Handle(message);                
      });

      Receive<DelayReply>(message => {
          Handle(message);
      });
  }

  public void Handle(string str)      //InMessage
  {
      Task.Run(async () =>
      {
          await Task.Delay(1000); //어떠한 값을 기다림동기적함수를 호출하여 지연시키지만 액터는 멈추지 않습니다.
          DelayReply reply = new DelayReply();
          reply.message = str;
          return reply;
      }).PipeTo(Self);
  }

  public void Handle(DelayReply data) //Out
  {
      string logtrace = string.Format("I'am {0} RE:{1}", Self.Path, data.message);
      log.Info(data.message);
      Sender.Tell(data);
  }
          
}

이것은 기존에 가진 모듈을(액터모델에 맞지않은) 모두 액터로 변환하여 불필요하게 복잡성을

늘릴필요가 없을시 기존 비동기처리 코드를 재사용하여 파이프를 통해 액터와 유연하게 연동이 가능합니다.

이렇게 사용했을시 부가적인 이득은  설정만으로 여러장치에 확장이가능하며 클러스터화가 될수 있다란것입니다.

액터에 블락킹을 요구하는 긴작업이 필요하게 될시 유용하게 사용할수 있습니다.

액터의 메시징 시스템을 블락킹하지 않고 Task에 실행을 동시성처리를 진행한후 

액터메시지의 블락킹없이 그 결과를 다시 전송받을수 있습니다.


코틀린 기반

코틀린의 경우 자바가 지원하지 않는 async/await 를 지원합니다. 

Code Block
languagec#
import kotlinx.coroutines.*
fun main() = runBlocking {
    val sumDeferred = async {
        var sum = 0
        for (i in 1..100) {
            sum += i
        }
        sum
    }
    println("Sum: ${sumDeferred.await()}")
}



AKKA의 Graph(DSL) 소개

Akka에서는 Graph를 활용하여, Future와 Promice를 더 우아하게 조합하여 흘려보냅니다.

AkkaStream처리 방식이며  동시성 처리를 위한 이러한 GRAPH-DSL기법은, 언어스펙들이 높아지면서 기본으로 담고 있는경우도 있습니다.

(ex java8에는 없지만, 12에는 이러한 컨셉이 생김 : 실제 확인은 못해보았습니다. java8에서는 이러한 GraphStream방식을 akka를 통해서 복잡한 동시성처리를 Graph로 설계할수 있습니다.)

Image Added

Code Block
themeEmacs
titleCode
collapsetrue
actorSystem = ActorSystem.Create("ServiceB");
materializer = actorSystem.Materializer();
 
var g = RunnableGraph.FromGraph(GraphDsl.Create(builder =>
{
    var source = Source.From(Enumerable.Range(1, 10));
    var sink = Sink.Ignore<int>().MapMaterializedValue(_ => NotUsed.Instance);               
    var sinkConsole = Sink.ForEach<int>(x=>Console.WriteLine(x.ToString()))
        .MapMaterializedValue(_ => NotUsed.Instance);
 
    var broadcast = builder.Add(new Broadcast<int>(2));
    var merge = builder.Add(new Merge<int>(2));
 
    var f1 = Flow.Create<int>().Select(x => x + 10);
    var f2 = Flow.Create<int>().Select(x => x + 10);
    var f3 = Flow.Create<int>().Select(x => x + 10);
    var f4 = Flow.Create<int>().Select(x => x + 10);
 
    builder.From(source).Via(f1).Via(broadcast).Via(f2).Via(merge).Via(f3).To(sinkConsole);
    builder.From(broadcast).Via(f4).To(merge);
 
    return ClosedShape.Instance;
}));
 
g.Run(materializer);


  • in : 입력은 1~10까지입니다.
  • f1 ~f4 : 각 팬을 지날떄마다 +10씩 더합니다.
  • bcast : 분기가 일어나는 구간이며, 요소가 분기수만큼 복제가 됩니다.
  • merge : 분기된 요소가 다시 집합하는 구간입니다. 
  • 녹색: 왼쪽에서 오른쪽으로 흐르는 하나의 흐름(혹은 위에서 아래로)
  • 노랑: 분기가 갈라지거나 합해지는 구간(  bcast : out이 2개 , merger : in 이 2개 )
  • 레드:  분기 처리


Link : Working with Graphs이것은 로컬 로직을 확장할수 있는 방법중에 하나가 될것입니다.