Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

언랭에서의 유한상태머신 디자인 컨셉에서 영향을 받았습니다.

http://erlang.org/documentation/doc-4.8.2/doc/design_principles/fsm.html

...

실시간배치잡 상태머신설계

실시간 배치 처리기를 다음과 같이 설계한다고 가정해봅시다.

  • 메시지를 일정량 모아뒀다가 내가 원하는 타이밍에 모아서 처리하고 싶다.
  • 특정시간이 지나면, 모은량과 상관없이 자동으로 처리하고 싶다.

처리기가 필요해 다음과 같은 요구사항이 있다고 가정해봅시다.

  • 실시간 메시지가 불규칙적으로 발생하며,때로는 대용량 메시지가 발생한다.
  • 항상 가동상태여야 하며 한꺼번에 너무 많은것을 처리하거나 너무 적게처리하지 않아야한다.
  • 메시지가 발생하지 않을때는 평온한 상태에서 자원점유를 많이하면 안된다 ( 보편적인 폴링방식이 불필요한 자원을 소모함)


구체적으로 사용이되는곳이 메시지가 불규칙적으로 생산된다고 가정하고, 메시지발생시마다

매번 DBwrite를 수행한다고 하면 DB의 성능저하를 일으키는 요소가될것입니다. 

...

어떠한 처리 함수를 100번호출하는것보다. 1번호출로 100개의데이터를 묶어서 처리하는것이 대부분 유용합니다.


항시 대기 배치 상태설계를 정의하면 상태 설계를 조금더 구체화하면 다음과 같습니다.

  • 상태값은 대기와 활동상태 두개를 가진다.
  • 대기 상태일때 메시지가 들어오면 활동상태로 깨어난다.
  • 특정시간이 지나면 대기상태로 진입하며, 현재받은 메시지를 모두처리함
  • 필요하면 임의로 큐를 비워서(Fluse) 배치처리가 되도록합니다.

기능이 복잡해 보이지만, 위 내용을 StateChartDiagram으로 옮기면 

아래와같이 단순한 처리기를 설계한것입니다.

Image Removed

구현-기반구조체정의

Expand
Code Block
languagejava
themeEmacs
import akka.actor.AbstractFSM;
import akka.actor.ActorRef;
import akka.japi.pf.UnitMatch;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.io.Serializable;
import java.time.Duration;


public final class SetTarget  {
  private final ActorRef ref;

  public SetTarget(ActorRef ref) {
    this.ref = ref;
  }

  public ActorRef getRef() {
    return ref;
  }

  @Override
  public String toString() {
    return "SetTarget{" +
      "ref=" + ref +
      '}';
  }
}


public final class Queue  {
  private final Object obj;

  public Queue(Object obj) {
    this.obj = obj;
  }

  public Object getObj() {
    return obj;
  }

  @Override
  public String toString() {
    return "Queue{" +
      "obj=" + obj +
      '}';
  }
}


public final class Batch {
  private final List<Object> list;

  public Batch(List<Object> list) {
    this.list = list;
  }

  public List<Object> getList() {
    return list;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    Batch batch = (Batch) o;

    return list.equals(batch.list);
  }

  @Override
  public int hashCode() {
    return list.hashCode();
  }

  @Override
  public String toString() {
    final StringBuilder builder = new StringBuilder();
    builder.append( "Batch{list=");
    list.stream().forEachOrdered(e -> { builder.append(e); builder.append(","); });
    int len = builder.length();
    builder.replace(len, len, "}");
    return builder.toString();
  }
}


public enum Flush {
  Flush
}


//states
enum State {
Idle, Active
}

//state data
interface Data {
}

enum Uninitialized implements Data {
Uninitialized
}

final class Todo implements Data {
private final ActorRef target;
private final List<Object> queue;

public Todo(ActorRef target, List<Object> queue) {
 this.target = target;
 this.queue = queue;
}

public ActorRef getTarget() {
 return target;
}

public List<Object> getQueue() {
 return queue;
}

@Override
public String toString() {
 return "Todo{" +
   "target=" + target +
   ", queue=" + queue +
   '}';
}

public Todo addElement(Object element) {
 List<Object> nQueue = new LinkedList<>(queue);
 nQueue.add(element);
 return new Todo(this.target, nQueue);
}

public Todo copy(List<Object> queue) {
 return new Todo(this.target, queue);
}

public Todo copy(ActorRef target) {
 return new Todo(target, this.queue);
}
}





액터설계

기능이 복잡해 보이지만, 위 내용을 StateChartDiagram으로 옮기면 

아래와같이 단순한 처리기를 설계한것입니다.

Image Added

그리고 위와같이 설계된 그래프와 코드설계가 일치하는것이 핵심 포인트입니다.

위 상태 전이요소를 아래 구현코드에서 찾으면 이해가 쉬울것으로 보입니다.


Code Block
languagejava
themeEmacs
@Component
@Scope("prototype")
public class Buncher extends AbstractFSM<State, Data> {	
	final static  com.psmon.cachedb.actors.fsm.State Idle = com.psmon.cachedb.actors.fsm.State.Idle;
	final static  com.psmon.cachedb.actors.fsm.Uninitialized Uninitialized = com.psmon.cachedb.actors.fsm.Uninitialized.Uninitialized;	
	final static public com.psmon.cachedb.actors.fsm.State Active = com.psmon.cachedb.actors.fsm.State.Active;

	{
	    startWith(Idle, Uninitialized);
	    when(Idle,
	      matchEvent(SetTarget.class, Uninitialized.class,
	        (setTarget, uninitialized) ->
	          stay().using(new Todo(setTarget.getRef(), new LinkedList<>()))));
	
	    onTransition(
	      matchState(Active, Idle, () -> {
	        // reuse this matcher
	        final UnitMatch<Data> m = UnitMatch.create(
	          matchData(Todo.class,
	            todo -> todo.getTarget().tell(new Batch(todo.getQueue()), getSelf())));
	        m.match(stateData());
	      }).
	      state(Idle, Active, () -> {/* Do something here */}));
	
	    when(Active, Duration.ofSeconds(1L),
	      matchEvent(Arrays.asList(Flush.class, StateTimeout()), Todo.class,
	        (event, todo) -> goTo(Idle).using(todo.copy(new LinkedList<>()))));
	
	    whenUnhandled(
	      matchEvent(Queue.class, Todo.class,
	        (queue, todo) -> goTo(Active).using(todo.addElement(queue.getObj()))).
	        anyEvent((event, state) -> {
	          log().warning("received unhandled request {} in state {}/{}",
	            event, stateName(), state);
	          return stay();
	        }));
	
	    initialize();
	  }
}

...

Code Block
languagejava
themeEmacs
	@Test
	public void contextLoads() {
		ActorSystem system = context.getBean(ActorSystem.class);
		SpringExtension ext = context.getBean(SpringExtension.class);				
		fsmTest(system,ext);		
		TestKit.shutdownActorSystem(system , scala.concurrent.duration.Duration.apply(5, TimeUnit.SECONDS ) ,true );		
	}
	
	protected void fsmTest(ActorSystem system,SpringExtension ext) {
	    new TestKit(system) {

		{
	        final ActorRef buncher =
	          system.actorOf(ext.props("buncher"));
	        
	        final ActorRef probe = getRef();

	        buncher.tell(new SetTarget(probe), probe);
	        buncher.tell(new Queue(42), probe);
	        buncher.tell(new Queue(43), probe);
	        LinkedList<Object> list1 = new LinkedList<>();
	        list1.add(42);
	        list1.add(43);
	        expectMsgEquals(new Batch(list1));
	        buncher.tell(new Queue(44), probe);

			buncher.tell(Flush.Flush , probe);
	        buncher.tell(new Queue(45), probe);
	        LinkedList<Object> list2 = new LinkedList<>();
	        list2.add(44);
	        expectMsgEquals(new Batch(list2));
	        LinkedList<Object> list3 = new LinkedList<>();
	        list3.add(45);
	        expectMsgEquals(new Batch(list3));
	        system.stop(buncher);
	      }};
	}
	

구현방법에 만족스럽지 못할수도 있으나, 이렇게 설계된 상태머신을 사용하는것은

아주 간결한 코드로, 서비스 코드내에서 적은 코드로 활용이 가능합니다.