Page History
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Component @Scope("prototype") public class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery { private final ActorSelection destination; private int msgCnt = 0; public MyPersistentActor(ActorSelection destination) { this.destination =destination; } @Override public String persistenceId() { return "persistence-id"; } @Override public Receive createReceive() { return receiveBuilder(). match(String.class, s -> { //메시지가 전송중 persist(new MsgSent(s), evt -> updateState(evt)); }). match(Confirm.class, confirm -> { this.msgCnt++; if(this.msgCnt % 2 == 0) { //의도적으로 짝수메시지만 확인함... persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt)); System.out.println("msg ok :" + confirm.deliveryId ); }else { //의도적으로의도적 으로 메시지 확인을 처리를 드롭...(다음번에 성공되어야함) System.out.println("drop msg for test:" + confirm.deliveryId ); } }). build(); } @Override public Receive createReceiveRecover() { return receiveBuilder(). match(Object.class, evt -> updateState(evt)).build(); } void updateState(Object event) { if (event instanceof MsgSent) { final MsgSent evt = (MsgSent) event; deliver(destination, deliveryId -> new Msg(deliveryId, evt.s)); } else if (event instanceof MsgConfirmed) { final MsgConfirmed evt = (MsgConfirmed) event; confirmDelivery(evt.deliveryId); } } } |
...