Page History
...
Code Block | ||||
---|---|---|---|---|
| ||||
RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink); |
3.전체 코드는 다음과 같습니다.
Code Block | ||||
---|---|---|---|---|
| ||||
import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; import akka.stream.IOResult; import akka.stream.ThrottleMode; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Flow; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import java.nio.file.Paths; import java.time.Duration; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.stream.Stream; public class EventStorageApp { public static void main(String[] args) { ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system); Source<String, NotUsed> source = Source.fromIterator(() -> eventIterator()); Sink<String, CompletionStage<IOResult>> sink = FileIO.toPath(Paths.get("/path/to/file")); Flow<String, String, NotUsed> throttle = Flow.of(String.class) .throttle(60, Duration.ofSeconds(1)); RunnableGraph<CompletionStage<IOResult>> runnableGraph = source.via(throttle).to(sink); CompletionStage<IOResult> completion = runnableGraph.run(materializer); completion.thenAccept(result -> { System.out.println("File write result: " + result); system.terminate(); }); } private static Iterator<String> eventIterator() { return Stream.generate(() -> UUID.randomUUID().toString()).iterator(); } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
Flow<String, String, NotUsed> flow = Flow.of(String.class)
.statefulMapConcat(() -> {
EventBuffer buffer = new EventBuffer(State.COLLECTING);
return event -> {
Tuple2<EventBuffer, Runnable> result = StateTransition.apply(buffer, event).apply(buffer, event);
buffer = result._1;
result._2.run
|
Info |
---|
이 코드를 akka를 사용하지 않고 java stream 을 사용하여 변환해죠 |
위의 코드를 Java Stream API를 사용하여 변환하는 방법은 다음과 같습니다.
Code Block | ||||
---|---|---|---|---|
| ||||
import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class EventStorageApp { public static void main(String[] args) throws InterruptedException { List<String> events = new ArrayList<>(); final Duration collectDuration = Duration.ofSeconds(3); final Duration writeDuration = Duration.ofMillis(10); Instant end; while (true) { Iterator<String> eventIterator = eventIterator(); end = Instant.now().plus(collectDuration); // 3초간 이벤트를 모읍니다. while (eventIterator.hasNext() && Instant.now().isBefore(end)) { events.add(eventIterator.next()); } if (Instant.now().isAfter(end)) { // 3초가 지나면 이벤트를 저장합니다. try (BufferedWriter writer = new BufferedWriter(new FileWriter("/path/to/file"))) { for (String event : events) { writer.write(event); writer.newLine(); } events.clear(); } catch (IOException e) { e.printStackTrace(); } } Thread.sleep(writeDuration.toMillis()); } } private static Iterator<String> eventIterator() { // 이벤트를 생성하는 코드 return Stream.generate(UUID::randomUUID) .map(UUID::toString) .iterator(); } } |
정리
...
ChatGPT가 만족할만한 코드를 작성 못해, 이후 사용할코드를 직접 작성하였습니다.
고성능 비동기 배치 스트림처리코드 : java-labs/SimpleActorTest.java at master · psmon/java-labs (github.com)
Code Block | ||
---|---|---|
| ||
@Test
@DisplayName("Actor - SafeBatchActor Test")
public void SafeBatchActor(){
// ThrottlerTest(초당제한) + TimerActor(초당처리)를 조합하여 스마트한 고성능 준실시간성 벌크처리를 작성할수 있습니다.
// 심플한 코드로 FSMBatch 처리기와 같은 기능을 만들수 있습니다.
new TestKit(system) {
{
final Materializer materializer = ActorMaterializer.create(system);
ActorRef batchActor = system.actorOf(SafeBatchActor.Props(), "batchActor");
int processCouuntPerSec = 100;
final ActorRef throttler =
Source.actorRef(1000, OverflowStrategy.dropNew())
.throttle(processCouuntPerSec, FiniteDuration.create(1, TimeUnit.SECONDS),
processCouuntPerSec, (ThrottleMode) ThrottleMode.shaping())
.to(Sink.actorRef(batchActor, akka.NotUsed.getInstance()))
.run(materializer);
for(int i=0 ; i<10000 ; i++){
throttler.tell("#### Hello World!",ActorRef.noSender());
}
expectNoMessage(Duration.ofSeconds(10));
}
};
} |