At-Least-Once Delivery - 적어도 메시지를 한번 보내려는 메카니즘으로 PersistenceActor와 결합하여

실패없는 메시지 전송 목표를 달성할수 있습니다. 적어도 한번 보내려는 메카니즘으로 인해

중복 메시지 발생할수 있음에 유의하여 작성 해보겠습니다.


(lightbulb) http://erlang.org/faq/academic.html#idp32880720 : 초창기에는 메시지 전송 보장을 위해 네트워크 에러를 포함하여

여러가지 에러를 캐치하여 재접속및 재선송 루틴으로 해결하려고 하였습니다. 하지만 이러한 전략은 대부분 실패를 하였고

최종 수신처가 확인해주는 방법이 보증을 할수 있는 가장 간단한 방법이며 언랭에서 도입되고 성공한 전송전략입니다.

이것은 전송한 메시지가 수신처에 도달되는 것을 보장되기를 원할때 유용합니다.

아주 착한 택배 서비스 시나리오를 가정해봅시다.

택배 아저씨는 택배를 대상에게  일반적으로 아주 빠르게 대부분 성공을 하며, 물건 분실을

방지하고자 꼭 수신받았음을 확인을 합니다. 하지만 받을사람이 부재중이라고 가정해봅시다.

택배 아저씨는 자신이 해야할  일을 하는동시에,  부재중을 체크하면서 다른 고객의 물건도 보내려고 할것입니다.

결국 자신이 배달해야할 모든 택배를 모두 보내게 되며, 물건은 복제가 안되기때문에 중복없이 꼭 한번만 보내게 됩니다.

상대편이 편지를 수신을 받았나? 일반 우편과는 다른 메시지처리방식입니다.

여기서 리얼세계랑 차이가 하나더 있다고하면, 알수없는 이유로 메시지 유실될시 , 메시지 전송이 확인되기 전까지

원복 메시지를 보관하고 있으며 재전송에따른 비용이 적으며(새로운 물건을 준비하는것보다.)

메시지를 받을때까지 재전송을 할수가 있다란 것입니다.

-중복 전송 메시지 전송이 계속 발생하는것과 중복을 제거하는 방법은 별도의 전략으로 추가설계가 가능합니다.


메시지 Flow 설계

위와같은 Flow의 기능을 가진 메시지 전송기는 범용적으로 여러요소에 많이 사용이 되며

최소의 코드로 작성해 보겠습니다.


메시지와 확인된 메시지 정의하기

의존설정(메이븐+akka conf)
//메이븐 설정
		<dependency>
		  <groupId>com.typesafe.akka</groupId>
		  <artifactId>akka-persistence_2.12</artifactId>
		  <version>${akka.version}</version>
		</dependency>		  
		
		<dependency>
			<groupId>org.fusesource.leveldbjni</groupId>
  			<artifactId>leveldbjni-all</artifactId>
  			<version>1.8</version>  
		</dependency>



//Akka Persistence설정 : 다양한 복구옵션과 다양한 저장장치선택이 가능합니다. 여기서는 기본 local-disk 장치를 이용합니다.
//작동중에만 고성능으로 영속성을 사용한다고하면  akka.persistence.journal.inmem 로 설정만 교체하면 되며
//영구 저장은 스냅샷 전략을 통해 db에 저장이 가능합니다.
akka{
	extensions = [akka.persistence.Persistence]
	  persistence {	  
	    journal {
	      plugin = "akka.persistence.journal.leveldb"
	      auto-start-journals = ["akka.persistence.journal.leveldb"]
	    }
	    snapshot-store {
	      plugin = "akka.persistence.snapshot-store.local"
	      auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"]
	    }	
	  }	
}

class Msg implements Serializable {
  private static final long serialVersionUID = 1L;
  public final long deliveryId;
  public final String s;

  public Msg(long deliveryId, String s) {
    this.deliveryId = deliveryId;
    this.s = s;
  }
}

class Confirm implements Serializable {
  private static final long serialVersionUID = 1L;
  public final long deliveryId;

  public Confirm(long deliveryId) {
    this.deliveryId = deliveryId;
  }
}

class MsgSent implements Serializable {
  private static final long serialVersionUID = 1L;
  public final String s;

  public MsgSent(String s) {
    this.s = s;
  }
}
class MsgConfirmed implements Serializable {
  private static final long serialVersionUID = 1L;
  public final long deliveryId;

  public MsgConfirmed(long deliveryId) {
    this.deliveryId = deliveryId;
  }
}


메시지가 실패시 재전송이 되게끔하는 구조설계는 간단합니다. 

  • 보낼/보낸 메시지 정의
  • 확인필요/확인된 메시지 정의
  • 메시지를 유니크(deliveryId) 하게 구분하는 식별자

AtLeastOnceDelivery 액터 설계

송신자 설계

@Component
@Scope("prototype")
public class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery {
	  private final ActorSelection destination;
	  
	  private int	msgCnt = 0;	  

	  public MyPersistentActor(ActorSelection destination) {		  
	      this.destination =destination;
	  }

	  @Override public String persistenceId() {
	    return "persistence-id";
	  }

	  @Override
	  public Receive createReceive() {
	    return receiveBuilder().
	      match(String.class, s -> {
	    	//메시지가 전송중
	        persist(new MsgSent(s), evt -> updateState(evt));	        	        
	      }).
	      match(Confirm.class, confirm -> {
	    	this.msgCnt++;
	    	  
	    	if(this.msgCnt % 2 == 0) {
		    	//의도적으로 짝수메시지만 확인함...
		        persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt));		        
		        System.out.println("msg ok :" + confirm.deliveryId  );
	    	}else {
	    		//의도적 으로 메시지 확인을 드롭...(다음번에 성공되어야함)
	    		System.out.println("drop msg for test:" + confirm.deliveryId );
	    	}
	      }).
	      build();
	  }

	  @Override
	  public Receive createReceiveRecover() {
	    return receiveBuilder().
	        match(Object.class, evt -> updateState(evt)).build();
	  }

	  void updateState(Object event) {
	    if (event instanceof MsgSent) {
	      final MsgSent evt = (MsgSent) event;
	      deliver(destination, deliveryId -> new Msg(deliveryId, evt.s));
	    } else if (event instanceof MsgConfirmed) {
	      final MsgConfirmed evt = (MsgConfirmed) event;
	      confirmDelivery(evt.deliveryId);
	    }
	  }
}
AbstractPersistentActorWithAtLeastOnceDelivery를 상속받으면 확인이되지 않는(실패한) 메시지에대해

재전송을 할수있는 액터 설계를 간단하게 할수가 있습니다.


수신자 설계

@Component
@Scope("prototype")
public class MyDestination extends AbstractActor {
	  @Override
	  public Receive createReceive() {
	    return receiveBuilder()
	      .match(Msg.class, msg -> {
	    	//메시지를 받고 할일을 하고..
	    	//....	    	  
	    	  
	    	//수신 메시지 확인됨을 보냄..
	        getSender().tell(new Confirm(msg.deliveryId), getSelf());
	      })
	      .build();
	  }
}


Test

	protected void persistenceMessage(ActorSystem system,SpringExtension ext)  {
	    new TestKit(system) {{
	    	ActorRef probe = getRef();
	    	//메시지 목적대상 액터 생성
	    	final ActorRef myDestination = system.actorOf(ext.props("myDestination"),"myDes");	    	
	    	final ActorSelection myDes = system.actorSelection("/user/myDes");
	    	
	    	//재전송가능한 메시지 발송기 생성 ( 옵션은 목적대상 액터선택자 ) 
	        final ActorRef myPersistentActor = system.actorOf( ext.props("myPersistentActor",myDes ), "myPersistentActor"  );
	        
	        for(int i=0;i<10;i++) {
	        	myPersistentActor.tell( "hi " + i ,ActorRef.noSender() );	        	
	        }
	        
	        //장애 시나리오를 가정하고,메시지가 모두 처리되는 시간 25초로 가정
	        expectNoMessage(java.time.Duration.ofSeconds(25));	        
	    }};
	}


의도적으로 드랍한 메시지가 결국 성공됨을 확인할수가 있습니다.

drop msg for test:1
msg ok :2
drop msg for test:3
msg ok :4
drop msg for test:5
msg ok :6
drop msg for test:7
msg ok :8
drop msg for test:9
msg ok :10
drop msg for test:1
msg ok :3
drop msg for test:5
msg ok :7
drop msg for test:9
msg ok :1
drop msg for test:5
msg ok :9
drop msg for test:5
msg ok :5



  • No labels
Write a comment…