Page History
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
//메이븐 설정
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
<version>1.8</version>
</dependency>
//Akka Persistence설정 : 다양한 복구옵션과 다양한 저장장치선택이 가능합니다. 여기서는 기본 local-disk 장치를 이용합니다.
akka{
extensions = [akka.persistence.Persistence]
persistence {
journal {
plugin = "akka.persistence.journal.leveldb"
auto-start-journals = ["akka.persistence.journal.leveldb"]
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.local"
auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"]
}
}
}
|
Expand | ||
---|---|---|
| ||
application.conf akka{ extensions = [akka.persistence.Persistence] } |
Code Block | ||||
---|---|---|---|---|
| ||||
class Msg implements Serializable { private static final long serialVersionUID = 1L; public final long deliveryId; public final String s; public Msg(long deliveryId, String s) { this.deliveryId = deliveryId; this.s = s; } } class Confirm implements Serializable { private static final long serialVersionUID = 1L; public final long deliveryId; public Confirm(long deliveryId) { this.deliveryId = deliveryId; } } class MsgSent implements Serializable { private static final long serialVersionUID = 1L; public final String s; public MsgSent(String s) { this.s = s; } } class MsgConfirmed implements Serializable { private static final long serialVersionUID = 1L; public final long deliveryId; public MsgConfirmed(long deliveryId) { this.deliveryId = deliveryId; } } |
메시지는 메시지가 실패시 재전송이 되게끔하는 구조설계는 간단합니다.
- 보낼/보낸 메시지 정의
- 확인필요/확인된 메시지 정의
- 메시지를 유니크하게 인식하는 방법유니크(deliveryId) 하게 구분하는 식별자
AtLeastOnceDelivery 액터 설계
...
Code Block | ||||
---|---|---|---|---|
| ||||
@Component @Scope("prototype") public class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery { private final ActorSelection destination; private int msgCnt = 0; public MyPersistentActor(ActorSelection destination) { this.destination =destination; } @Override public String persistenceId() { return "persistence-id"; } @Override public Receive createReceive() { return receiveBuilder(). match(String.class, s -> { //메시지가 전송중 persist(new MsgSent(s), evt -> updateState(evt)); }). match(Confirm.class, confirm -> { this.msgCnt++; if(this.msgCnt % 2 == 0) { //의도적으로 짝수메시지만 확인함... persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt)); System.out.println("msg ok :" + confirm.deliveryId ); }else { //의도적 으로 메시지 확인을 드롭...(다음번에 성공되어야함) System.out.println("drop msg for test:" + confirm.deliveryId ); } }). build(); } @Override public Receive createReceiveRecover() { return receiveBuilder(). match(Object.class, evt -> updateState(evt)).build(); } void updateState(Object event) { if (event instanceof MsgSent) { final MsgSent evt = (MsgSent) event; deliver(destination, deliveryId -> new Msg(deliveryId, evt.s)); } else if (event instanceof MsgConfirmed) { final MsgConfirmed evt = (MsgConfirmed) event; confirmDelivery(evt.deliveryId); } } } |
AbstractPersistentActorWithAtLeastOnceDelivery를 상속받으면 확인이되지 않는(실패한) 메시지에대해
재전송을 할수있는 액터 설계를 간단하게 할수가 있습니다.
수신자 설계
Code Block | ||||
---|---|---|---|---|
| ||||
@Component @Scope("prototype") public class MyDestination extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match(Msg.class, msg -> { //메시지를 받고 할일을 하고.. //.... //수신 메시지 확인됨을 보냄.. getSender().tell(new Confirm(msg.deliveryId), getSelf()); }) .build(); } } |
...