Page History
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Component @Scope("prototype") public class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery { private final ActorSelection destination; private ActorRef testActor;int msgCnt = 0; public MyPersistentActor(ActorSelection destination) { this.destination =destination; } @Override public String persistenceId() { return "persistence-id"; } @Override public Receive createReceive() { return receiveBuilder(). match(String.class, s -> { //메시지가 전송중 persist(new MsgSent(s), evt -> updateState(evt)); this.testActor = sender(); }). match(Confirm.class, confirm -> { this.msgCnt++; if(this.msgCnt % 2 == 0) { //전송된의도적으로 메시지가 확인이됨짝수메시지만 확인함... persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt)); //메시지 검증을 하기위한 추가확인(실제는 필요없는 코드) this.testActor.tell("testOK", ActorRef.noSender()) System.out.println("msg ok :" + confirm.deliveryId ); }else { //의도적으로 확인을 처리를 드롭... System.out.println("drop msg for test:" + confirm.deliveryId ); } }). build(); } @Override public Receive createReceiveRecover() { return receiveBuilder(). match(Object.class, evt -> updateState(evt)).build(); } void updateState(Object event) { if (event instanceof MsgSent) { final MsgSent evt = (MsgSent) event; deliver(destination, deliveryId -> new Msg(deliveryId, evt.s)); } else if (event instanceof MsgConfirmed) { final MsgConfirmed evt = (MsgConfirmed) event; confirmDelivery(evt.deliveryId); } } //복구옵션은 지정하지 않음 @Override public Recovery recovery() { return Recovery.none(); } } |
수신자 설계
Code Block | ||||
---|---|---|---|---|
| ||||
@Component @Scope("prototype") public class MyDestination extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match(Msg.class, msg -> { //메시지를 받았으니,받고 할일을 하고.. //.... //수신 메시지 확인됨을 보냄.. getSender().tell(new Confirm(msg.deliveryId), getSelf()); }) .build(); } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
protected void persistenceMessage(ActorSystem system,SpringExtension ext) { new TestKit(system) {{ //메시지 전송 확인을 위한 테스트액터(검증은 제 3자가 해야하는 컨셉) ActorRef probe = getRef(); //메시지 목적대상 액터 생성 final ActorRef myDestination = system.actorOf(ext.props("myDestination"),"myDes"); final ActorSelection myDes = system.actorSelection("/user/myDes"); //재전송가능한 메시지 발송기 생성 ( 옵션은 목적대상 액터선택자 ) final ActorRef myPersistentActor = system.actorOf( ext.props("myPersistentActor",myDes ), "myPersistentActor" ); //myPersistentActor 액터에게 메시지 발생for(int i=0;i<10;i++) { myPersistentActor.tell( "hi1",probehi " + i ,ActorRef.noSender() ); //메시지경로} myPersistentActor(송신) -> myDes(목적지) -> myPersistentActor(수신확인) -> probe(추가검증) expectMsg("testOK"); //장애 probe의 메시지함에 testOK메시지가 존재하면, 승인절차의 메시지 확인완료 시나리오를 가정하고,메시지가 모두 처리되는 시간 25초로 가정 //Akka의 TestToolkit은 복잡한 비동기 메시지 테스트를 심플하게 해줍니다.expectNoMessage(java.time.Duration.ofSeconds(25)); }}; } |