Page History
...
Info |
---|
akka stream을 이용해서 불특정 이벤트를 고성능으로 저장하는 프로그램을 자바로 작성해작성해죠 |
Akka Stream을 사용하여 불특정 이벤트를 고성능으로 저장하는 자바 프로그램을 작성하는 방법에 대해 설명드리겠습니다.
...
Code Block | ||||
---|---|---|---|---|
| ||||
RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink); |
3.전체 코드는 다음과 같습니다.
Code Block | ||||
---|---|---|---|---|
| ||||
import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; import akka.stream.IOResult; import akka.stream.ThrottleMode; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Flow; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import java.nio.file.Paths; import java.time.Duration; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.stream.Stream; public class EventStorageApp { public static void main(String[] args) { ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system); Source<String, NotUsed> source = Source.fromIterator(() -> eventIterator()); Sink<String, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("/path/to/file")); Flow<String, String, NotUsed> throttle = Flow.of(String.class) .throttle(60, Duration.ofSeconds(1)); RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink); CompletionStage<IOResult> completion = runnableGraph.run(materializer); completion.thenAccept(result -> { System.out.println("File write result: " + result); system.terminate(); }); } private static Iterator<String> eventIterator() { return Stream.generate(() -> UUID.randomUUID().toString()).iterator(); } } |
Info |
---|
위 코드를 상태머신을 이용하여 3초동안 모은 데이터를 저장할수 있게 수정해죠 |
...
Code Block | ||||
---|---|---|---|---|
| ||||
Flow<String, String, NotUsed> flow = Flow.of(String.class)
.statefulMapConcat(() -> {
EventBuffer buffer = new EventBuffer(State.COLLECTING);
return event -> {
Tuple2<EventBuffer, Runnable> result = StateTransition.apply(buffer, event).apply(buffer, event);
buffer = result._1;
result._2.run
|
Info |
---|
이 코드를 akka를 사용하지 않고 java stream 을 사용하여 변환해죠 |
위의 코드를 Java Stream API를 사용하여 변환하는 방법은 다음과 같습니다.
Code Block | ||||
---|---|---|---|---|
| ||||
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class EventStorageApp {
public static void main(String[] args) throws InterruptedException {
List<String> events = new ArrayList<>();
final Duration collectDuration = Duration.ofSeconds(3);
final Duration writeDuration = Duration.ofMillis(10);
Instant end;
while (true) {
Iterator<String> eventIterator = eventIterator();
end = Instant.now().plus(collectDuration);
// 3초간 이벤트를 모읍니다.
while (eventIterator.hasNext() && Instant.now().isBefore(end)) {
events.add(eventIterator.next());
}
if (Instant.now().isAfter(end)) {
// 3초가 지나면 이벤트를 저장합니다.
try (BufferedWriter writer = new BufferedWriter(new FileWriter("/path/to/file"))) {
for (String event : events) {
writer.write(event);
writer.newLine();
}
events.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
Thread.sleep(writeDuration.toMillis());
}
}
private static Iterator<String> eventIterator() {
// 이벤트를 생성하는 코드
return Stream.generate(UUID::randomUUID)
.map(UUID::toString)
.iterator();
}
} |
정리
...
ChatGPT가 만족할만한 코드를 작성 못해, 이후 사용할코드를 직접 작성하였습니다.
고성능 비동기 배치 스트림처리코드 : java-labs/SimpleActorTest.java at master · psmon/java-labs (github.com)
Code Block | ||
---|---|---|
| ||
@Test @DisplayName("Actor - SafeBatchActor Test") public void SafeBatchActor(){ // ThrottlerTest(초당제한) + TimerActor(초당처리)를 조합하여 스마트한 고성능 준실시간성 벌크처리를 작성할수 있습니다. // 심플한 코드로 FSMBatch 처리기와 같은 기능을 만들수 있습니다. new TestKit(system) { { final Materializer materializer = ActorMaterializer.create(system); ActorRef batchActor = system.actorOf(SafeBatchActor.Props(), "batchActor"); int processCouuntPerSec = 100; final ActorRef throttler = Source.actorRef(1000, OverflowStrategy.dropNew()) .throttle(processCouuntPerSec, FiniteDuration.create(1, TimeUnit.SECONDS), processCouuntPerSec, (ThrottleMode) ThrottleMode.shaping()) .to(Sink.actorRef(batchActor, akka.NotUsed.getInstance())) .run(materializer); for(int i=0 ; i<10000 ; i++){ throttler.tell("#### Hello World!",ActorRef.noSender()); } expectNoMessage(Duration.ofSeconds(10)); } }; } |