이벤트 소싱(및 샤딩) 은 커다란 웹사이트를 수십억명의 사용자 규모로 확장하는 아이디어이며

이아이디어는 매우 간답합니다. 메시지가 발생할때 명령의 이벤트 목록을 생성하고 저장을 합니다.

이 계획은 이벤트만이 저장소에 추가되고 아무것도 변이되지 않는 점이며 

이를 통해 이벤트 스트림의 소비자를 완벽하게 복제하고 확장할수 있습니다. 

이벤트 소싱을 실현하기 위해서, AKKA에서 PersistenceActor 가 사용이 됩니다.



Event Driven

이벤트 소싱은 간단한 컨셉이나, 이러한 컨셉을 적용하는것이 어려운 이유중하나는

DB에 마지막 결과만을 저장하는 데이터중심적 설계가 아닌 메시지 기반으로한 이벤트 중심적(Event Driven)설계가 선행이 되어야하며

이것에대한 이해및 설계가 익숙하지 않는데 있으며 기존 설계를 바꾸는것 역시 쉽지 않은데 있습니다.

방식전통적인 3 tier 아키텍이벤트 중심적 아키텍

프레젠테이션 : 사용자가 시스템과 상호 작용하는 사용자 인터페이스

응용 프로그램 : "비즈니스 로직" 수행 일반적으로 상태를 유지하지 않고 작업을 수행합니다. 

지속성 (Persistence) : 일반적으로 데이터베이스에 저장하는 장소.

동기식 요청 - 응답 작업 (예 :REST / JSON)

서비스가 생성하는 이벤트

서비스가 소비하는 이벤트

일괄 읽기 및 쓰기 (예 : 서비스에서 분석 시스템으로 데이터를 추출하는 ETL)

Event Driven은 Eventsourcing을 하기 위한 선행과제이며 이것만 으로 큰 주제 이기 때문에 아래링크를 포함하여 다양한 자료를 보시는것을 권장합니다.

참고링크:

이벤트드리븐(한글자료)

이벤트드리븐(영문자료)

EventSourcing

메시지설계

class Cmd implements Serializable {
    private static final long serialVersionUID = 1L;
    private final String data;

    public Cmd(String data) {
        this.data = data;
    }

    public String getData() {
        return data;
    }
}


class Evt implements Serializable {
    private static final long serialVersionUID = 1L;
    private final String data;

    public Evt(String data) {
        this.data = data;
    }

    public String getData() {
        return data;
    }
}

class ExampleState implements Serializable {
    private static final long serialVersionUID = 1L;
    private final ArrayList<String> events;

    public ExampleState() {
        this(new ArrayList<>());
    }

    public ExampleState(ArrayList<String> events) {
        this.events = events;
    }

    public ExampleState copy() {
        return new ExampleState(new ArrayList<>(events));
    }

    public void update(Evt evt) {
        events.add(evt.getData());
    }

    public int size() {
        return events.size();
    }

    @Override
    public String toString() {
        return events.toString();
    }
}

이벤트 소싱에 사용되는 메시지 설계는 간단합니다.

명령 발생에따라 이벤트가 발생을 하고, 이 이벤트 리스트를 추가하고 재생할수만 있으면 됩니다.


액터설계

주요설정
akka{
	persistence.max-concurrent-recoveries = 50	# 최대 유지수(장애발생시 복구 수)
	
	extensions = [akka.persistence.Persistence]
	  persistence {	  
	    journal {
        # leveldb or inmem : 이 옵션에따라 어플리케이션이 종료가 되어도 지정된 store방식을 통해 복원을 해줍니다. 
	      plugin = "akka.persistence.journal.inmem"
	      auto-start-journals = ["akka.persistence.journal.inmem"]
	    }	  
	    	
	    snapshot-store {
	      plugin = "akka.persistence.snapshot-store.local"
	      auto-start-snapshot-stores = ["akka.persistence.snapshot-store.local"]
	    }	
	  }
}
@Component
@Scope("prototype")
public class ExamplePersistentActor extends AbstractPersistentActor {
	  
    private ExampleState state = new ExampleState();
    
    private int snapShotInterval = 1000;

    public int getNumEvents() {
        return state.size();
    }

    @Override
    public String persistenceId() { return "ExamplePersistentActor-id-1"; }

    @Override
    public Receive createReceiveRecover() {
        return receiveBuilder()
            .match(Evt.class, state::update)
            .match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot())
            .build();
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(Cmd.class, c -> {
              final String data = c.getData();
              final Evt evt = new Evt(data + "-" + getNumEvents());
              persist(evt, (Evt e) -> {
                  state.update(e);
                  getContext().getSystem().eventStream().publish(e);
                  if (lastSequenceNr() % snapShotInterval == 0 && lastSequenceNr() != 0)
                      // IMPORTANT: create a copy of snapshot because ExampleState is mutable
                      saveSnapshot(state.copy());
              });
            })
            .matchEquals("print", s -> System.out.println(state) )	//이벤트를 재생
            .build();
    }	
}

명령에대한 이벤트목록을 생성하고, 재생(print) 하는 것만을 구현하였으며

이것에대한 지속력및 복원력전략을 어떻게 할것인가는? 옵션을 통해 지정가능합니다.

이벤트 소싱을 구현하는 방법은 AKKA가 아니여도 여러가지 방법이 있겠으나

AKKA에서는 주로 Actor모델을 확장해 여러가지 분산처리에대한 문제를

적은 코드를 사용해 일괄적으로 해결하려고 합니다.


Test-사용예

	protected void persistenceEventSrc(ActorSystem system,SpringExtension ext)  {
	    new TestKit(system) {{
	    	ActorRef probe = getRef();	    	
	    	
			Props examplePersistentActor = ext.props("examplePersistentActor");
			
			System.out.println("eventActor 액터생성");
			ActorRef eventActor = system.actorOf(examplePersistentActor, "eventActor");
			
			System.out.println("event 생성");
			eventActor.tell(new Cmd("test1"), ActorRef.noSender());
			eventActor.tell(new Cmd("test2"), ActorRef.noSender());
			eventActor.tell(new Cmd("test3"), ActorRef.noSender());
			eventActor.tell(new Cmd("test4"), ActorRef.noSender());
			
			System.out.println("event 재생");
			eventActor.tell( "print" , ActorRef.noSender());			
			expectNoMessage(java.time.Duration.ofSeconds(1));
			
			System.out.println("eventActor 종료또는 비정상종료");
			eventActor.tell( akka.actor.PoisonPill.getInstance() , ActorRef.noSender());			
			expectNoMessage(java.time.Duration.ofSeconds(1));
			
			System.out.println("eventActor 재생성");
			ActorRef eventActor2 = system.actorOf(examplePersistentActor, "eventActor");
			
			System.out.println("event 복원확인");
			eventActor2.tell( "print" , ActorRef.noSender());
			
			expectNoMessage(java.time.Duration.ofSeconds(1));
				        
	    }};
	}

[INFO] [06/11/2018 10:37:26.066] [main] [akka.persistence.Persistence(akka://AkkaTestApp)] Auto-starting journal plugin `akka.persistence.journal.inmem`
[INFO] [06/11/2018 10:37:26.083] [main] [akka.persistence.Persistence(akka://AkkaTestApp)] Auto-starting snapshot store `akka.persistence.snapshot-store.local`
eventActor 액터생성
event 생성
event 재생
[test1-0, test2-1, test3-2, test4-3]
eventActor 종료또는 비정상종료
eventActor 재생성
event 복원확인
[test1-0, test2-1, test3-2, test4-3]

이벤트 소싱의 Flow를 간단하게 확인할수 있는 TestCode입니다. 특정한 기능 목적을 두지는 않았으며

이 코드는 장바구니에 그대로 활용될수 있으며,  장바구니에 상품을 담고뺀 순서를 알수 있으며( 장바구니의 이벤트를 기록및 재생)

마지막 장바구니에 담긴 물건의 최종상태도 알수가 있습니다.( 이것을 어떻게 서비스할지는, 이벤트를 소비하는 서비스의  역활  )

필요에 따라 분산된 저장소에 확장이 가능합니다.   


참고자료: 





  • No labels
Write a comment…