You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Actor API중 Become/Unbecome으로 간단한 상태변환 액터 설계가가능하지만

조금더 복잡한 상태머신이 필요할시 AbstatractFSM 객체 활용이 가능합니다.


상태머신 디자인샘플


언랭에서의 유한상태머신 디자인 컨셉에서 영향을 받았습니다.

http://erlang.org/documentation/doc-4.8.2/doc/design_principles/fsm.html


배치잡 상태머신설계

배치 처리기를 다음과 같이 설계한다고 가정해봅시다.

  • 메시지를 일정량 모아뒀다가 내가 원하는 타이밍에 모아서 처리하고 싶다.
  • 특정시간이 지나면, 모은량과 상관없이 자동으로 처리하고 싶다.


메시지가 불규칙적으로 생산된다고 가정하고, 메시지발생시마다 매번 DBwrite를 수행한다고 하면

DB의 성능저하를 일으키는 요소가될것입니다. 

1초란 시간은 네트워크에서 아주긴 시간입니다. 초당 100메시지 전송이 가능하다고 하면

어떠한 처리 함수를 100번호출하는것보다. 1번호출로 100개의데이터를 묶어서 처리하는것이 대부분 유용합니다.


항시 대기 배치 상태설계를 정의하면 다음과 같습니다.

  • 상태값은 대기와 활동상태 두개를 가진다.
  • 대기 상태일때 메시지가 들어오면 활동상태로 깨어난다.
  • 특정시간이 지나면 대기상태로 진입하며, 현재받은 메시지를 모두처리함
  • 필요하면 임의로 큐를 비워서(Fluse) 배치처리가 되도록합니다.



기능이 복잡해 보이지만, 위 내용을 StateChartDiagram으로 옮기면 

아래와같이 단순한 처리기를 설계한것입니다.

구현-기반구조체정의

import akka.actor.AbstractFSM;
import akka.actor.ActorRef;
import akka.japi.pf.UnitMatch;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.io.Serializable;
import java.time.Duration;


public final class SetTarget  {
  private final ActorRef ref;

  public SetTarget(ActorRef ref) {
    this.ref = ref;
  }

  public ActorRef getRef() {
    return ref;
  }

  @Override
  public String toString() {
    return "SetTarget{" +
      "ref=" + ref +
      '}';
  }
}


public final class Queue  {
  private final Object obj;

  public Queue(Object obj) {
    this.obj = obj;
  }

  public Object getObj() {
    return obj;
  }

  @Override
  public String toString() {
    return "Queue{" +
      "obj=" + obj +
      '}';
  }
}


public final class Batch {
  private final List<Object> list;

  public Batch(List<Object> list) {
    this.list = list;
  }

  public List<Object> getList() {
    return list;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    Batch batch = (Batch) o;

    return list.equals(batch.list);
  }

  @Override
  public int hashCode() {
    return list.hashCode();
  }

  @Override
  public String toString() {
    final StringBuilder builder = new StringBuilder();
    builder.append( "Batch{list=");
    list.stream().forEachOrdered(e -> { builder.append(e); builder.append(","); });
    int len = builder.length();
    builder.replace(len, len, "}");
    return builder.toString();
  }
}


public enum Flush {
  Flush
}


//states
enum State {
Idle, Active
}

//state data
interface Data {
}

enum Uninitialized implements Data {
Uninitialized
}

final class Todo implements Data {
private final ActorRef target;
private final List<Object> queue;

public Todo(ActorRef target, List<Object> queue) {
 this.target = target;
 this.queue = queue;
}

public ActorRef getTarget() {
 return target;
}

public List<Object> getQueue() {
 return queue;
}

@Override
public String toString() {
 return "Todo{" +
   "target=" + target +
   ", queue=" + queue +
   '}';
}

public Todo addElement(Object element) {
 List<Object> nQueue = new LinkedList<>(queue);
 nQueue.add(element);
 return new Todo(this.target, nQueue);
}

public Todo copy(List<Object> queue) {
 return new Todo(this.target, queue);
}

public Todo copy(ActorRef target) {
 return new Todo(target, this.queue);
}
}





액터설계

@Component
@Scope("prototype")
public class Buncher extends AbstractFSM<State, Data> {	
	final static  com.psmon.cachedb.actors.fsm.State Idle = com.psmon.cachedb.actors.fsm.State.Idle;
	final static  com.psmon.cachedb.actors.fsm.Uninitialized Uninitialized = com.psmon.cachedb.actors.fsm.Uninitialized.Uninitialized;	
	final static public com.psmon.cachedb.actors.fsm.State Active = com.psmon.cachedb.actors.fsm.State.Active;

	{
	    startWith(Idle, Uninitialized);
	    when(Idle,
	      matchEvent(SetTarget.class, Uninitialized.class,
	        (setTarget, uninitialized) ->
	          stay().using(new Todo(setTarget.getRef(), new LinkedList<>()))));
	
	    onTransition(
	      matchState(Active, Idle, () -> {
	        // reuse this matcher
	        final UnitMatch<Data> m = UnitMatch.create(
	          matchData(Todo.class,
	            todo -> todo.getTarget().tell(new Batch(todo.getQueue()), getSelf())));
	        m.match(stateData());
	      }).
	      state(Idle, Active, () -> {/* Do something here */}));
	
	    when(Active, Duration.ofSeconds(1L),
	      matchEvent(Arrays.asList(Flush.class, StateTimeout()), Todo.class,
	        (event, todo) -> goTo(Idle).using(todo.copy(new LinkedList<>()))));
	
	    whenUnhandled(
	      matchEvent(Queue.class, Todo.class,
	        (queue, todo) -> goTo(Active).using(todo.addElement(queue.getObj()))).
	        anyEvent((event, state) -> {
	          log().warning("received unhandled request {} in state {}/{}",
	            event, stateName(), state);
	          return stay();
	        }));
	
	    initialize();
	  }
}


액터테스트

	@Test
	public void contextLoads() {
		ActorSystem system = context.getBean(ActorSystem.class);
		SpringExtension ext = context.getBean(SpringExtension.class);				
		fsmTest(system,ext);		
		TestKit.shutdownActorSystem(system , scala.concurrent.duration.Duration.apply(5, TimeUnit.SECONDS ) ,true );		
	}
	
	protected void fsmTest(ActorSystem system,SpringExtension ext) {
	    new TestKit(system) {

		{
	        final ActorRef buncher =
	          system.actorOf(ext.props("buncher"));
	        
	        final ActorRef probe = getRef();

	        buncher.tell(new SetTarget(probe), probe);
	        buncher.tell(new Queue(42), probe);
	        buncher.tell(new Queue(43), probe);
	        LinkedList<Object> list1 = new LinkedList<>();
	        list1.add(42);
	        list1.add(43);
	        expectMsgEquals(new Batch(list1));
	        buncher.tell(new Queue(44), probe);

			buncher.tell(Flush.Flush , probe);
	        buncher.tell(new Queue(45), probe);
	        LinkedList<Object> list2 = new LinkedList<>();
	        list2.add(44);
	        expectMsgEquals(new Batch(list2));
	        LinkedList<Object> list3 = new LinkedList<>();
	        list3.add(45);
	        expectMsgEquals(new Batch(list3));
	        system.stop(buncher);
	      }};
	}
	






  • No labels