Page History
...
Code Block | ||||
---|---|---|---|---|
| ||||
Flow<String, String, NotUsed> flow = Flow.of(String.class) .statefulMapConcat(() -> { EventBuffer buffer = new EventBuffer(State.COLLECTING); return event -> { Tuple2<EventBuffer, Runnable> result = StateTransition.apply(buffer, event).apply(buffer, event); buffer = result._1; result._2.run |
Info |
---|
이 코드를 akka를 사용하지 않고 java stream 을 사용하여 변환해죠 |
위의 코드를 Java Stream API를 사용하여 변환하는 방법은 다음과 같습니다.
Code Block | ||||
---|---|---|---|---|
| ||||
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class EventStorageApp {
public static void main(String[] args) throws InterruptedException {
List<String> events = new ArrayList<>();
final Duration collectDuration = Duration.ofSeconds(3);
final Duration writeDuration = Duration.ofMillis(10);
Instant end;
while (true) {
Iterator<String> eventIterator = eventIterator();
end = Instant.now().plus(collectDuration);
// 3초간 이벤트를 모읍니다.
while (eventIterator.hasNext() && Instant.now().isBefore(end)) {
events.add(eventIterator.next());
}
if (Instant.now().isAfter(end)) {
// 3초가 지나면 이벤트를 저장합니다.
try (BufferedWriter writer = new BufferedWriter(new FileWriter("/path/to/file"))) {
for (String event : events) {
writer.write(event);
writer.newLine();
}
events.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
Thread.sleep(writeDuration.toMillis());
}
}
private static Iterator<String> eventIterator() {
// 이벤트를 생성하는 코드
return Stream.generate(UUID::randomUUID)
.map(UUID::toString)
.iterator();
}
} |