akka stream을 이용해서 불특정 이벤트를 고성능으로 저장하는 프로그램을 자바로 작성해죠
Akka Stream을 사용하여 불특정 이벤트를 고성능으로 저장하는 자바 프로그램을 작성하는 방법에 대해 설명드리겠습니다.
Maven 또는 Gradle을 사용하여 Akka Stream 라이브러리를 프로젝트에 추가합니다.
Source와 Sink를 정의합니다. Source는 이벤트를 생성하고, Sink는 이벤트를 저장하는 역할을 합니다. 이 예제에서는 파일 시스템에 이벤트를 저장하는 Sink를 사용합니다.
Source<String, NotUsed> source = Source.fromIterator(() -> eventIterator()); Sink<String, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("/path/to/file"));
3. 이벤트를 생성하는 메서드를 작성합니다. 이 예제에서는 무작위로 이벤트를 생성합니다.
private static Iterator<String> eventIterator() { return Stream.generate(() -> UUID.randomUUID().toString()).iterator(); }
4. Flow를 정의합니다. Flow는 Source에서 생성된 이벤트를 변환하거나 필터링할 수 있는 중간 처리 단계입니다. 이 예제에서는 Flow를 사용하지 않습니다.
Flow<String, String, NotUsed> flow = Flow.of(String.class);
5. Source, Flow, Sink를 결합하여 RunnableGraph를 생성합니다.
FunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(flow).to(sink);
6. RunnableGraph를 실행합니다.
CompletionStage<IOResult> completion = runnableGraph.run(materializer);
7. 결과를 처리합니다.
completion.thenAccept(result -> { System.out.println("File write result: " + result); system.terminate(); });
전체 코드는 다음과 같습니다.
import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; import akka.stream.IOResult; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Flow; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import java.nio.file.Paths; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.stream.Stream; public class EventStorageApp { public static void main(String[] args) { ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system); Source<String, NotUsed> source = Source.fromIterator(() -> eventIterator()); Sink<String, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("/path/to/file")); Flow<String, String, NotUsed> flow = Flow.of(String.class); RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(flow).to(sink); CompletionStage<IOResult> completion = runnableGraph.run(materializer); completion.thenAccept(result -> { System.out.println("File write result: " + result); system.terminate(); }); } private static Iterator<String> eventIterator() { return Stream.generate(() -> UUID.randomUUID().toString()).iterator(); } }
위 코드를 초당 60회 제약이 있는 저장방식으로 수정해죠
초당 60회 제약이 있는 저장방식으로 위에서 작성한 코드를 수정하는 방법은 다음과 같습니다.
- akka.stream.Throttle 유틸리티 클래스를 사용하여 초당 이벤트 생성 속도를 제한하는 Flow를 추가합니다. 이 예제에서는 초당 60개의 이벤트를 생성하도록 설정합니다.
Flow<String, String, NotUsed> throttle = Flow.of(String.class) .throttle(60, Duration.ofSeconds(1));
2.Source와 Sink, Flow를 결합하여 RunnableGraph를 생성합니다. Flow는 throttle을 적용한 후 Sink로 이벤트를 전달합니다.
RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink);
3.전체 코드는 다음과 같습니다.
import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; import akka.stream.IOResult; import akka.stream.ThrottleMode; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Flow; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import java.nio.file.Paths; import java.time.Duration; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.stream.Stream; public class EventStorageApp { public static void main(String[] args) { ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system); Source<String, NotUsed> source = Source.fromIterator(() -> eventIterator()); Sink<String, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("/path/to/file")); Flow<String, String, NotUsed> throttle = Flow.of(String.class) .throttle(60, Duration.ofSeconds(1)); RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink); CompletionStage<IOResult> completion = runnableGraph.run(materializer); completion.thenAccept(result -> { System.out.println("File write result: " + result); system.terminate(); }); } private static Iterator<String> eventIterator() { return Stream.generate(() -> UUID.randomUUID().toString()).iterator(); } }
위 코드를 상태머신을 이용하여 3초동안 모은 데이터를 저장할수 있게 수정해죠
- 상태머신의 상태를 나타내는 클래스를 정의합니다. 이 클래스는 상태머신이 현재 어떤 상태인지를 나타내는 state 필드와, 현재 상태에서 수집한 이벤트 목록을 저장하는 events 필드로 구성됩니다.
public class EventBuffer { private List<String> events = new ArrayList<>(); private State state; public EventBuffer(State state) { this.state = state; } public void addEvent(String event) { events.add(event); } public boolean isFull() { return events.size() >= 60; } public List<String> getEvents() { return events; } public State getState() { return state; } public void setState(State state) { this.state = state; } }
2.현재 상태와 이벤트를 입력으로 받아 새로운 상태와 다음 단계에 수행할 액션을 반환하는 함수를 정의합니다.
public class StateTransition { public static BiFunction<EventBuffer, String, Tuple2<EventBuffer, Runnable>> apply(State state) { switch (state) { case COLLECTING: return collecting(); case SAVING: return saving(); default: throw new IllegalArgumentException("Invalid state: " + state); } } private static BiFunction<EventBuffer, String, Tuple2<EventBuffer, Runnable>> collecting() { return (buffer, event) -> { buffer.addEvent(event); if (buffer.isFull()) { buffer.setState(State.SAVING); return Tuple.of(buffer, () -> System.out.println("Saving buffer: " + buffer.getEvents())); } return Tuple.of(buffer, () -> {}); }; } private static BiFunction<EventBuffer, String, Tuple2<EventBuffer, Runnable>> saving() { return (buffer, event) -> { throw new IllegalStateException("Invalid state: " + buffer.getState()); }; } }
collecting
함수는 현재 상태가 COLLECTING
인 경우, 이벤트를 현재 버퍼에 추가하고 버퍼가 꽉 찼을 경우 다음 상태를 SAVING
으로 변경하고 버퍼를 저장하는 액션을 반환합니다. saving
함수는 현재 상태가 SAVING
인 경우, 이벤트를 받으면 예외를 발생시키는 액션을 반환합니다.
3. akka stream의 statefulMapConcat
함수를 사용하여 상태머신을 구현합니다. statefulMapConcat
함수는 입력 스트림의 각 요소에 대해 상태머신 함수를 적용하고 반환된 액션을 실행하는 역할을 합니다.
Flow<String, String, NotUsed> flow = Flow.of(String.class) .statefulMapConcat(() -> { EventBuffer buffer = new EventBuffer(State.COLLECTING); return event -> { Tuple2<EventBuffer, Runnable> result = StateTransition.apply(buffer, event).apply(buffer, event); buffer = result._1; result._2.run
이 코드를 akka를 사용하지 않고 java stream 을 사용하여 변환해죠
위의 코드를 Java Stream API를 사용하여 변환하는 방법은 다음과 같습니다.
import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class EventStorageApp { public static void main(String[] args) throws InterruptedException { List<String> events = new ArrayList<>(); final Duration collectDuration = Duration.ofSeconds(3); final Duration writeDuration = Duration.ofMillis(10); Instant end; while (true) { Iterator<String> eventIterator = eventIterator(); end = Instant.now().plus(collectDuration); // 3초간 이벤트를 모읍니다. while (eventIterator.hasNext() && Instant.now().isBefore(end)) { events.add(eventIterator.next()); } if (Instant.now().isAfter(end)) { // 3초가 지나면 이벤트를 저장합니다. try (BufferedWriter writer = new BufferedWriter(new FileWriter("/path/to/file"))) { for (String event : events) { writer.write(event); writer.newLine(); } events.clear(); } catch (IOException e) { e.printStackTrace(); } } Thread.sleep(writeDuration.toMillis()); } } private static Iterator<String> eventIterator() { // 이벤트를 생성하는 코드 return Stream.generate(UUID::randomUUID) .map(UUID::toString) .iterator(); } }
정리
ChatGPT가 만족할만한 코드를 작성 못해, 이후 사용할코드를 직접 작성하였습니다.
고성능 비동기 배치 스트림처리코드 : java-labs/SimpleActorTest.java at master · psmon/java-labs (github.com)
@Test @DisplayName("Actor - SafeBatchActor Test") public void SafeBatchActor(){ // ThrottlerTest(초당제한) + TimerActor(초당처리)를 조합하여 스마트한 고성능 준실시간성 벌크처리를 작성할수 있습니다. // 심플한 코드로 FSMBatch 처리기와 같은 기능을 만들수 있습니다. new TestKit(system) { { final Materializer materializer = ActorMaterializer.create(system); ActorRef batchActor = system.actorOf(SafeBatchActor.Props(), "batchActor"); int processCouuntPerSec = 100; final ActorRef throttler = Source.actorRef(1000, OverflowStrategy.dropNew()) .throttle(processCouuntPerSec, FiniteDuration.create(1, TimeUnit.SECONDS), processCouuntPerSec, (ThrottleMode) ThrottleMode.shaping()) .to(Sink.actorRef(batchActor, akka.NotUsed.getInstance())) .run(materializer); for(int i=0 ; i<10000 ; i++){ throttler.tell("#### Hello World!",ActorRef.noSender()); } expectNoMessage(Duration.ofSeconds(10)); } }; }