You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

At-Least-Once Delivery - 적어도 메시지를 한번 보내려는 메카니즘으로 PersistenceActor와 결합하여

실패없는 메시지 전송 목표를 달성할수 있습니다. 적어도 한번 보내려는 메카니즘으로 인해

중복 메시지 발생할수 있음에 유의하여 작성 해보겠습니다.

이것은 전송한 메시지가 수신처에 도달되는 것을 보장되기를 원할때 유용합니다.

아주 착한 택배 서비스 시나리오를 가정해봅시다.

택배 아저씨는 택배를 대상에게  일반적으로 아주 빠르게 대부분 성공을 하며, 물건 분실을

방지하고자 꼭 수신받았음을 확인을 합니다. 하지만 받을사람이 부재중이라고 가정해봅시다.

택배 아저씨는 자신이 해야할  일을 하는동시에,  부재중을 체크하면서 다른 고객의 물건도 보내려고 할것입니다.

결국 자신이 배달해야할 모든 택배를 모두 보내게 되며, 물건은 복제가 안되기때문에 중복없이 꼭 한번만 보내게 됩니다.

상대편이 편지를 수신을 받았나? 일반 우편과는 다른 메시지처리방식입니다.


메시지 Flow 설계

위와같은 Flow의 기능을 가진 메시지 전송기는 범용적으로 여러요소에 많이 사용이 되며

최소의 코드로 작성해 보겠습니다.


메시지와 확인된 메시지 정의하기

class Msg implements Serializable {
  private static final long serialVersionUID = 1L;
  public final long deliveryId;
  public final String s;

  public Msg(long deliveryId, String s) {
    this.deliveryId = deliveryId;
    this.s = s;
  }
}

class Confirm implements Serializable {
  private static final long serialVersionUID = 1L;
  public final long deliveryId;

  public Confirm(long deliveryId) {
    this.deliveryId = deliveryId;
  }
}

class MsgSent implements Serializable {
  private static final long serialVersionUID = 1L;
  public final String s;

  public MsgSent(String s) {
    this.s = s;
  }
}
class MsgConfirmed implements Serializable {
  private static final long serialVersionUID = 1L;
  public final long deliveryId;

  public MsgConfirmed(long deliveryId) {
    this.deliveryId = deliveryId;
  }
}


메시지는 구조설계는 간단합니다. 

  • 보낼/보낸 메시지 정의
  • 확인필요/확인된 메시지 정의
  • 메시지를 유니크하게 인식하는 방법

AtLeastOnceDelivery 액터 설계

송신자 설계

@Component
@Scope("prototype")
public class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery {
	  private final ActorSelection destination;
	  
	  private int	msgCnt = 0;	  

	  public MyPersistentActor(ActorSelection destination) {		  
	      this.destination =destination;
	  }

	  @Override public String persistenceId() {
	    return "persistence-id";
	  }

	  @Override
	  public Receive createReceive() {
	    return receiveBuilder().
	      match(String.class, s -> {
	    	//메시지가 전송중
	        persist(new MsgSent(s), evt -> updateState(evt));	        	        
	      }).
	      match(Confirm.class, confirm -> {
	    	this.msgCnt++;
	    	  
	    	if(this.msgCnt % 2 == 0) {
		    	//의도적으로 짝수메시지만 확인함...
		        persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt));		        
		        System.out.println("msg ok :" + confirm.deliveryId  );
	    	}else {
	    		//의도적 으로 메시지 확인을 드롭...(다음번에 성공되어야함)
	    		System.out.println("drop msg for test:" + confirm.deliveryId );
	    	}
	      }).
	      build();
	  }

	  @Override
	  public Receive createReceiveRecover() {
	    return receiveBuilder().
	        match(Object.class, evt -> updateState(evt)).build();
	  }

	  void updateState(Object event) {
	    if (event instanceof MsgSent) {
	      final MsgSent evt = (MsgSent) event;
	      deliver(destination, deliveryId -> new Msg(deliveryId, evt.s));
	    } else if (event instanceof MsgConfirmed) {
	      final MsgConfirmed evt = (MsgConfirmed) event;
	      confirmDelivery(evt.deliveryId);
	    }
	  }
}


수신자 설계

@Component
@Scope("prototype")
public class MyDestination extends AbstractActor {
	  @Override
	  public Receive createReceive() {
	    return receiveBuilder()
	      .match(Msg.class, msg -> {
	    	//메시지를 받고 할일을 하고..
	    	//....	    	  
	    	  
	    	//수신 메시지 확인됨을 보냄..
	        getSender().tell(new Confirm(msg.deliveryId), getSelf());
	      })
	      .build();
	  }
}


Test

	protected void persistenceMessage(ActorSystem system,SpringExtension ext)  {
	    new TestKit(system) {{
	    	ActorRef probe = getRef();
	    	//메시지 목적대상 액터 생성
	    	final ActorRef myDestination = system.actorOf(ext.props("myDestination"),"myDes");	    	
	    	final ActorSelection myDes = system.actorSelection("/user/myDes");
	    	
	    	//재전송가능한 메시지 발송기 생성 ( 옵션은 목적대상 액터선택자 ) 
	        final ActorRef myPersistentActor = system.actorOf( ext.props("myPersistentActor",myDes ), "myPersistentActor"  );
	        
	        for(int i=0;i<10;i++) {
	        	myPersistentActor.tell( "hi " + i ,ActorRef.noSender() );	        	
	        }
	        
	        //장애 시나리오를 가정하고,메시지가 모두 처리되는 시간 25초로 가정
	        expectNoMessage(java.time.Duration.ofSeconds(25));	        
	    }};
	}


의도적으로 드랍한 메시지가 결국 성공됨을 확인할수가 있습니다.

drop msg for test:86
msg ok :87
drop msg for test:88
msg ok :89
drop msg for test:90
msg ok :91
drop msg for test:92
msg ok :93
drop msg for test:94
msg ok :95
drop msg for test:86
msg ok :88
drop msg for test:90
msg ok :92
drop msg for test:94
msg ok :86
drop msg for test:90
msg ok :94
drop msg for test:90
msg ok :90






  • No labels