Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeEmacs
Flow<String, String, NotUsed> flow = Flow.of(String.class)
.statefulMapConcat(() -> {
EventBuffer buffer = new EventBuffer(State.COLLECTING);
return event -> {
Tuple2<EventBuffer, Runnable> result = StateTransition.apply(buffer, event).apply(buffer, event);
buffer = result._1;
result._2.run



Info

이 코드를 akka를 사용하지 않고 java stream 을 사용하여 변환해죠


위의 코드를 Java Stream API를 사용하여 변환하는 방법은 다음과 같습니다.


Code Block
languagejava
themeEmacs
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class EventStorageApp {

    public static void main(String[] args) throws InterruptedException {
        List<String> events = new ArrayList<>();

        final Duration collectDuration = Duration.ofSeconds(3);
        final Duration writeDuration = Duration.ofMillis(10);
        Instant end;

        while (true) {
            Iterator<String> eventIterator = eventIterator();
            end = Instant.now().plus(collectDuration);

            // 3초간 이벤트를 모읍니다.
            while (eventIterator.hasNext() && Instant.now().isBefore(end)) {
                events.add(eventIterator.next());
            }

            if (Instant.now().isAfter(end)) {
                // 3초가 지나면 이벤트를 저장합니다.
                try (BufferedWriter writer = new BufferedWriter(new FileWriter("/path/to/file"))) {
                    for (String event : events) {
                        writer.write(event);
                        writer.newLine();
                    }
                    events.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            Thread.sleep(writeDuration.toMillis());
        }
    }

    private static Iterator<String> eventIterator() {
        // 이벤트를 생성하는 코드
        return Stream.generate(UUID::randomUUID)
                .map(UUID::toString)
                .iterator();
    }
}



정리

...

ChatGPT가 만족할만한 코드를 작성 못해, 이후 사용할코드를 직접 작성하였습니다.

고성능 비동기 배치 스트림처리코드 : java-labs/SimpleActorTest.java at master · psmon/java-labs (github.com)


Code Block
themeEmacs
    @Test
    @DisplayName("Actor - SafeBatchActor Test")
    public void SafeBatchActor(){

        // ThrottlerTest(초당제한) + TimerActor(초당처리)를 조합하여 스마트한 고성능 준실시간성 벌크처리를 작성할수 있습니다.
        // 심플한 코드로 FSMBatch 처리기와 같은 기능을 만들수 있습니다.

        new TestKit(system) {
            {
                final Materializer materializer = ActorMaterializer.create(system);

                ActorRef batchActor = system.actorOf(SafeBatchActor.Props(), "batchActor");

                int processCouuntPerSec = 100;
                final ActorRef throttler =
                        Source.actorRef(1000, OverflowStrategy.dropNew())
                                .throttle(processCouuntPerSec, FiniteDuration.create(1, TimeUnit.SECONDS),
                                        processCouuntPerSec, (ThrottleMode) ThrottleMode.shaping())
                                .to(Sink.actorRef(batchActor, akka.NotUsed.getInstance()))
                                .run(materializer);


                for(int i=0 ; i<10000 ; i++){
                    throttler.tell("#### Hello World!",ActorRef.noSender());
                }

                expectNoMessage(Duration.ofSeconds(10));

            }
        };
    }