Page History
...
Info |
---|
akka stream을 이용해서 불특정 이벤트를 고성능으로 저장하는 프로그램을 자바로 작성해작성해죠 |
Akka Stream을 사용하여 불특정 이벤트를 고성능으로 저장하는 자바 프로그램을 작성하는 방법에 대해 설명드리겠습니다.
Maven 또는 Gradle을 사용하여 Akka Stream 라이브러리를 프로젝트에 추가합니다.
Source와 Sink를 정의합니다. Source는 이벤트를 생성하고, Sink는 이벤트를 저장하는 역할을 합니다. 이 예제에서는 파일 시스템에 이벤트를 저장하는 Sink를 사용합니다.
Code Block | ||||
---|---|---|---|---|
| ||||
Source<String, NotUsed> source = Source.fromIterator(() -> eventIterator()); Sink<String, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("/path/to/file")); |
...
Code Block | ||||
---|---|---|---|---|
| ||||
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.IOResult;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;
public class EventStorageApp {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create();
ActorMaterializer materializer = ActorMaterializer.create(system);
Source<String, NotUsed> source = Source.fromIterator(() -> eventIterator());
Sink<String, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("/path/to/file"));
Flow<String, String, NotUsed> flow = Flow.of(String.class);
RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(flow).to(sink);
CompletionStage<IOResult> completion = runnableGraph.run(materializer);
completion.thenAccept(result -> {
System.out.println("File write result: " + result);
system.terminate();
});
}
private static Iterator<String> eventIterator() {
return Stream.generate(() -> UUID.randomUUID().toString()).iterator();
}
}
|
Info |
---|
위 코드를 초당 60회 제약이 있는 저장방식으로 수정해죠 |
초당 60회 제약이 있는 저장방식으로 위에서 작성한 코드를 수정하는 방법은 다음과 같습니다.
- akka.stream.Throttle 유틸리티 클래스를 사용하여 초당 이벤트 생성 속도를 제한하는 Flow를 추가합니다. 이 예제에서는 초당 60개의 이벤트를 생성하도록 설정합니다.
Code Block | ||||
---|---|---|---|---|
| ||||
Flow<String, String, NotUsed> throttle = Flow.of(String.class) .throttle(60, Duration.ofSeconds(1)); |
...
Code Block | ||||
---|---|---|---|---|
| ||||
RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink); |
3.전체 코드는 다음과 같습니다.
Code Block | ||||
---|---|---|---|---|
| ||||
import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; import akka.stream.IOResult; import akka.stream.ThrottleMode; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Flow; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import java.nio.file.Paths; import java.time.Duration; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.stream.Stream; public class EventStorageApp { public static void main(String[] args) { ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system); Source<String, NotUsed> source = Source.fromIterator(() -> eventIterator()); Sink<String, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("/path/to/file")); Flow<String, String, NotUsed> throttle = Flow.of(String.class) .throttle(60, Duration.ofSeconds(1)); RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink); CompletionStage<IOResult> completion = runnableGraph.run(materializer); completion.thenAccept(result -> { System.out.println("File write result: " + result); system.terminate(); }); } private static Iterator<String> eventIterator() { return Stream.generate(() -> UUID.randomUUID().toString()).iterator(); } } |
Info |
---|
위 코드를 상태머신을 이용하여 3초동안 모은 데이터를 저장할수 있게 수정해죠 |
- 상태머신의 상태를 나타내는 클래스를 정의합니다. 이 클래스는 상태머신이 현재 어떤 상태인지를 나타내는 state 필드와, 현재 상태에서 수집한 이벤트 목록을 저장하는 events 필드로 구성됩니다.
Code Block | ||||
---|---|---|---|---|
| ||||
public class EventBuffer { private List<String> events = new ArrayList<>(); private State state; public EventBuffer(State state) { this.state = state; } public void addEvent(String event) { events.add(event); } public boolean isFull() { return events.size() >= 60; } public List<String> getEvents() { return events; } public State getState() { return state; } public void setState(State state) { this.state = state; } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class StateTransition { public static BiFunction<EventBuffer, String, Tuple2<EventBuffer, Runnable>> apply(State state) { switch (state) { case COLLECTING: return collecting(); case SAVING: return saving(); default: throw new IllegalArgumentException("Invalid state: " + state); } } private static BiFunction<EventBuffer, String, Tuple2<EventBuffer, Runnable>> collecting() { return (buffer, event) -> { buffer.addEvent(event); if (buffer.isFull()) { buffer.setState(State.SAVING); return Tuple.of(buffer, () -> System.out.println("Saving buffer: " + buffer.getEvents())); } return Tuple.of(buffer, () -> {}); }; } private static BiFunction<EventBuffer, String, Tuple2<EventBuffer, Runnable>> saving() { return (buffer, event) -> { throw new IllegalStateException("Invalid state: " + buffer.getState()); }; } } |
collecting
함수는 현재 상태가 COLLECTING
인 경우, 이벤트를 현재 버퍼에 추가하고 버퍼가 꽉 찼을 경우 다음 상태를 SAVING
으로 변경하고 버퍼를 저장하는 액션을 반환합니다. saving
함수는 현재 상태가 SAVING
인 경우, 이벤트를 받으면 예외를 발생시키는 액션을 반환합니다.
3. akka stream의 statefulMapConcat
함수를 사용하여 상태머신을 구현합니다. statefulMapConcat
함수는 입력 스트림의 각 요소에 대해 상태머신 함수를 적용하고 반환된 액션을 실행하는 역할을 합니다.
Code Block | ||||
---|---|---|---|---|
| ||||
Flow<String, String, NotUsed> flow = Flow.of(String.class) .statefulMapConcat(() -> { EventBuffer buffer = new EventBuffer(State.COLLECTING); return event -> { Tuple2<EventBuffer, Runnable> result = StateTransition.apply(buffer, event).apply(buffer, event); buffer = result._1; result._2.run |
Info |
---|
이 코드를 akka를 사용하지 않고 java stream 을 사용하여 변환해죠 |
위의 코드를 Java Stream API를 사용하여 변환하는 방법은 다음과 같습니다.
Code Block | ||||
---|---|---|---|---|
| ||||
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class EventStorageApp {
public static void main(String[] args) throws InterruptedException {
List<String> events = new ArrayList<>();
final Duration collectDuration = Duration.ofSeconds(3);
final Duration writeDuration = Duration.ofMillis(10);
Instant end;
while (true) {
Iterator<String> eventIterator = eventIterator();
end = Instant.now().plus(collectDuration);
// 3초간 이벤트를 모읍니다.
while (eventIterator.hasNext() && Instant.now().isBefore(end)) {
events.add(eventIterator.next());
}
if (Instant.now().isAfter(end)) {
// 3초가 지나면 이벤트를 저장합니다.
try (BufferedWriter writer = new BufferedWriter(new FileWriter("/path/to/file"))) {
for (String event : events) {
writer.write(event);
writer.newLine();
}
events.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
Thread.sleep(writeDuration.toMillis());
}
}
private static Iterator<String> eventIterator() {
// 이벤트를 생성하는 코드
return Stream.generate(UUID::randomUUID)
.map(UUID::toString)
.iterator();
}
} |
정리
...
ChatGPT가 만족할만한 코드를 작성 못해, 이후 사용할코드를 직접 작성하였습니다.
고성능 비동기 배치 스트림처리코드 : java-labs/SimpleActorTest.java at master · psmon/java-labs (github.com)
Code Block | ||
---|---|---|
| ||
@Test
@DisplayName("Actor - SafeBatchActor Test")
public void SafeBatchActor(){
// ThrottlerTest(초당제한) + TimerActor(초당처리)를 조합하여 스마트한 고성능 준실시간성 벌크처리를 작성할수 있습니다.
// 심플한 코드로 FSMBatch 처리기와 같은 기능을 만들수 있습니다.
new TestKit(system) {
{
final Materializer materializer = ActorMaterializer.create(system);
ActorRef batchActor = system.actorOf(SafeBatchActor.Props(), "batchActor");
int processCouuntPerSec = 100;
final ActorRef throttler =
Source.actorRef(1000, OverflowStrategy.dropNew())
.throttle(processCouuntPerSec, FiniteDuration.create(1, TimeUnit.SECONDS),
processCouuntPerSec, (ThrottleMode) ThrottleMode.shaping())
.to(Sink.actorRef(batchActor, akka.NotUsed.getInstance()))
.run(materializer);
for(int i=0 ; i<10000 ; i++){
throttler.tell("#### Hello World!",ActorRef.noSender());
}
expectNoMessage(Duration.ofSeconds(10));
}
};
} |