Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info

akka stream을 이용해서 불특정 이벤트를 고성능으로 저장하는 프로그램을 자바로 작성해작성해죠

Akka Stream을 사용하여 불특정 이벤트를 고성능으로 저장하는 자바 프로그램을 작성하는 방법에 대해 설명드리겠습니다.

...

Code Block
languagejava
themeEmacs
RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink);


3.전체 코드는 다음과 같습니다.

Code Block
languagejava
themeEmacs
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.IOResult;
import akka.stream.ThrottleMode;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;

public class EventStorageApp {

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create();
        ActorMaterializer materializer = ActorMaterializer.create(system);

        Source<String, NotUsed> source = Source.fromIterator(() -> eventIterator());
        Sink<String, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("/path/to/file"));
        Flow<String, String, NotUsed> throttle = Flow.of(String.class)
            .throttle(60, Duration.ofSeconds(1));

        RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink);
        CompletionStage<IOResult> completion = runnableGraph.run(materializer);

        completion.thenAccept(result -> {
            System.out.println("File write result: " + result);
            system.terminate();
        });
    }

    private static Iterator<String> eventIterator() {
        return Stream.generate(() -> UUID.randomUUID().toString()).iterator();
    }

}

Info

위 코드를 상태머신을 이용하여 3초동안 모은 데이터를 저장할수 있게 수정해죠

...

Code Block
languagejava
themeEmacs
Flow<String, String, NotUsed> flow = Flow.of(String.class)
.statefulMapConcat(() -> {
EventBuffer buffer = new EventBuffer(State.COLLECTING);
return event -> {
Tuple2<EventBuffer, Runnable> result = StateTransition.apply(buffer, event).apply(buffer, event);
buffer = result._1;
result._2.run



Info

이 코드를 akka를 사용하지 않고 java stream 을 사용하여 변환해죠


위의 코드를 Java Stream API를 사용하여 변환하는 방법은 다음과 같습니다.


Code Block
languagejava
themeEmacs
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class EventStorageApp {

    public static void main(String[] args) throws InterruptedException {
        List<String> events = new ArrayList<>();

        final Duration collectDuration = Duration.ofSeconds(3);
        final Duration writeDuration = Duration.ofMillis(10);
        Instant end;

        while (true) {
            Iterator<String> eventIterator = eventIterator();
            end = Instant.now().plus(collectDuration);

            // 3초간 이벤트를 모읍니다.
            while (eventIterator.hasNext() && Instant.now().isBefore(end)) {
                events.add(eventIterator.next());
            }

            if (Instant.now().isAfter(end)) {
                // 3초가 지나면 이벤트를 저장합니다.
                try (BufferedWriter writer = new BufferedWriter(new FileWriter("/path/to/file"))) {
                    for (String event : events) {
                        writer.write(event);
                        writer.newLine();
                    }
                    events.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            Thread.sleep(writeDuration.toMillis());
        }
    }

    private static Iterator<String> eventIterator() {
        // 이벤트를 생성하는 코드
        return Stream.generate(UUID::randomUUID)
                .map(UUID::toString)
                .iterator();
    }
}



정리

...

ChatGPT가 만족할만한 코드를 작성 못해, 이후 사용할코드를 직접 작성하였습니다.

고성능 비동기 배치 스트림처리코드 : java-labs/SimpleActorTest.java at master · psmon/java-labs (github.com)


Code Block
themeEmacs
    @Test
    @DisplayName("Actor - SafeBatchActor Test")
    public void SafeBatchActor(){

        // ThrottlerTest(초당제한) + TimerActor(초당처리)를 조합하여 스마트한 고성능 준실시간성 벌크처리를 작성할수 있습니다.
        // 심플한 코드로 FSMBatch 처리기와 같은 기능을 만들수 있습니다.

        new TestKit(system) {
            {
                final Materializer materializer = ActorMaterializer.create(system);

                ActorRef batchActor = system.actorOf(SafeBatchActor.Props(), "batchActor");

                int processCouuntPerSec = 100;
                final ActorRef throttler =
                        Source.actorRef(1000, OverflowStrategy.dropNew())
                                .throttle(processCouuntPerSec, FiniteDuration.create(1, TimeUnit.SECONDS),
                                        processCouuntPerSec, (ThrottleMode) ThrottleMode.shaping())
                                .to(Sink.actorRef(batchActor, akka.NotUsed.getInstance()))
                                .run(materializer);


                for(int i=0 ; i<10000 ; i++){
                    throttler.tell("#### Hello World!",ActorRef.noSender());
                }

                expectNoMessage(Duration.ofSeconds(10));

            }
        };
    }