Actor API중 Become/Unbecome으로 간단한 상태변환 액터 설계가가능하지만

조금더 복잡한 상태머신이 필요할시 AbstatractFSM 객체 활용이 가능합니다.


상태머신 디자인샘플


언랭에서의 유한상태머신 디자인 컨셉에서 영향을 받았습니다.

http://erlang.org/documentation/doc-4.8.2/doc/design_principles/fsm.html

실시간배치잡 상태머신설계

실시간 배치 처리기가 필요해 다음과 같은 요구사항이 있다고 가정해봅시다.

  • 실시간 메시지가 불규칙적으로 발생하며,때로는 대용량 메시지가 발생한다.
  • 항상 가동상태여야 하며 한꺼번에 너무 많은것을 처리하거나 너무 적게처리하지 않아야한다.
  • 메시지가 발생하지 않을때는 평온한 상태에서 자원점유를 많이하면 안된다 ( 보편적인 폴링방식이 불필요한 자원을 소모함)


구체적으로 사용이되는곳이 메시지가 불규칙적으로 생산된다고 가정하고, 메시지발생시마다

매번 DBwrite를 수행한다고 하면 DB의 성능저하를 일으키는 요소가될것입니다. 

1초란 시간은 네트워크에서 아주긴 시간입니다. 초당 100메시지 전송이 가능하다고 하면

어떠한 처리 함수를 100번호출하는것보다. 1번호출로 100개의데이터를 묶어서 처리하는것이 대부분 유용합니다.


상태 설계를 조금더 구체화하면 다음과 같습니다.

  • 상태값은 대기와 활동상태 두개를 가진다.
  • 대기 상태일때 메시지가 들어오면 활동상태로 깨어난다.
  • 특정시간이 지나면 대기상태로 진입하며, 현재받은 메시지를 모두처리함
  • 필요하면 임의로 큐를 비워서(Fluse) 배치처리가 되도록합니다.

구현-기반구조체정의

import akka.actor.AbstractFSM;
import akka.actor.ActorRef;
import akka.japi.pf.UnitMatch;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.io.Serializable;
import java.time.Duration;


public final class SetTarget  {
  private final ActorRef ref;

  public SetTarget(ActorRef ref) {
    this.ref = ref;
  }

  public ActorRef getRef() {
    return ref;
  }

  @Override
  public String toString() {
    return "SetTarget{" +
      "ref=" + ref +
      '}';
  }
}


public final class Queue  {
  private final Object obj;

  public Queue(Object obj) {
    this.obj = obj;
  }

  public Object getObj() {
    return obj;
  }

  @Override
  public String toString() {
    return "Queue{" +
      "obj=" + obj +
      '}';
  }
}


public final class Batch {
  private final List<Object> list;

  public Batch(List<Object> list) {
    this.list = list;
  }

  public List<Object> getList() {
    return list;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    Batch batch = (Batch) o;

    return list.equals(batch.list);
  }

  @Override
  public int hashCode() {
    return list.hashCode();
  }

  @Override
  public String toString() {
    final StringBuilder builder = new StringBuilder();
    builder.append( "Batch{list=");
    list.stream().forEachOrdered(e -> { builder.append(e); builder.append(","); });
    int len = builder.length();
    builder.replace(len, len, "}");
    return builder.toString();
  }
}


public enum Flush {
  Flush
}


//states
enum State {
Idle, Active
}

//state data
interface Data {
}

enum Uninitialized implements Data {
Uninitialized
}

final class Todo implements Data {
private final ActorRef target;
private final List<Object> queue;

public Todo(ActorRef target, List<Object> queue) {
 this.target = target;
 this.queue = queue;
}

public ActorRef getTarget() {
 return target;
}

public List<Object> getQueue() {
 return queue;
}

@Override
public String toString() {
 return "Todo{" +
   "target=" + target +
   ", queue=" + queue +
   '}';
}

public Todo addElement(Object element) {
 List<Object> nQueue = new LinkedList<>(queue);
 nQueue.add(element);
 return new Todo(this.target, nQueue);
}

public Todo copy(List<Object> queue) {
 return new Todo(this.target, queue);
}

public Todo copy(ActorRef target) {
 return new Todo(target, this.queue);
}
}





액터설계

기능이 복잡해 보이지만, 위 내용을 StateChartDiagram으로 옮기면 

아래와같이 단순한 처리기를 설계한것입니다.

그리고 위와같이 설계된 그래프와 코드설계가 일치하는것이 핵심 포인트입니다.

위 상태 전이요소를 아래 구현코드에서 찾으면 이해가 쉬울것으로 보입니다.


@Component
@Scope("prototype")
public class Buncher extends AbstractFSM<State, Data> {	
	final static  com.psmon.cachedb.actors.fsm.State Idle = com.psmon.cachedb.actors.fsm.State.Idle;
	final static  com.psmon.cachedb.actors.fsm.Uninitialized Uninitialized = com.psmon.cachedb.actors.fsm.Uninitialized.Uninitialized;	
	final static public com.psmon.cachedb.actors.fsm.State Active = com.psmon.cachedb.actors.fsm.State.Active;

	{
	    startWith(Idle, Uninitialized);
	    when(Idle,
	      matchEvent(SetTarget.class, Uninitialized.class,
	        (setTarget, uninitialized) ->
	          stay().using(new Todo(setTarget.getRef(), new LinkedList<>()))));
	
	    onTransition(
	      matchState(Active, Idle, () -> {
	        // reuse this matcher
	        final UnitMatch<Data> m = UnitMatch.create(
	          matchData(Todo.class,
	            todo -> todo.getTarget().tell(new Batch(todo.getQueue()), getSelf())));
	        m.match(stateData());
	      }).
	      state(Idle, Active, () -> {/* Do something here */}));
	
	    when(Active, Duration.ofSeconds(1L),
	      matchEvent(Arrays.asList(Flush.class, StateTimeout()), Todo.class,
	        (event, todo) -> goTo(Idle).using(todo.copy(new LinkedList<>()))));
	
	    whenUnhandled(
	      matchEvent(Queue.class, Todo.class,
	        (queue, todo) -> goTo(Active).using(todo.addElement(queue.getObj()))).
	        anyEvent((event, state) -> {
	          log().warning("received unhandled request {} in state {}/{}",
	            event, stateName(), state);
	          return stay();
	        }));
	
	    initialize();
	  }
}


액터테스트

	@Test
	public void contextLoads() {
		ActorSystem system = context.getBean(ActorSystem.class);
		SpringExtension ext = context.getBean(SpringExtension.class);				
		fsmTest(system,ext);		
		TestKit.shutdownActorSystem(system , scala.concurrent.duration.Duration.apply(5, TimeUnit.SECONDS ) ,true );		
	}
	
	protected void fsmTest(ActorSystem system,SpringExtension ext) {
	    new TestKit(system) {

		{
	        final ActorRef buncher =
	          system.actorOf(ext.props("buncher"));
	        
	        final ActorRef probe = getRef();

	        buncher.tell(new SetTarget(probe), probe);
	        buncher.tell(new Queue(42), probe);
	        buncher.tell(new Queue(43), probe);
	        LinkedList<Object> list1 = new LinkedList<>();
	        list1.add(42);
	        list1.add(43);
	        expectMsgEquals(new Batch(list1));
	        buncher.tell(new Queue(44), probe);

			buncher.tell(Flush.Flush , probe);
	        buncher.tell(new Queue(45), probe);
	        LinkedList<Object> list2 = new LinkedList<>();
	        list2.add(44);
	        expectMsgEquals(new Batch(list2));
	        LinkedList<Object> list3 = new LinkedList<>();
	        list3.add(45);
	        expectMsgEquals(new Batch(list3));
	        system.stop(buncher);
	      }};
	}
	

구현방법에 만족스럽지 못할수도 있으나, 이렇게 설계된 상태머신을 사용하는것은

아주 간결한 코드로, 서비스 코드내에서 적은 코드로 활용이 가능합니다.


배치를 위한 DBWrite 구현

@Component("DBWriteActor")
@Scope("prototype")
public class DBWriteActor extends AbstractActor{
    private final LoggingAdapter log = Logging.getLogger(getContext().system(), "DBWriteActor");
    
	@Autowired
	ItemBuyLogRepository itemBuyLogRepository;
    
    @Override
    public Receive createReceive() {
      return receiveBuilder()
        .match(  com.psmon.cachedb.actors.fsm.Batch.class , s -> {
          log.info("Received ItemBuyLog message: {}", s.toString()  );                              
          List<ItemBuyLog> insertList = new ArrayList<>();          
          s.getList().forEach( item-> {
        	  ItemBuyLog itemLog = (ItemBuyLog)item;
        	  insertList.add(itemLog);        	  
          });
          itemBuyLogRepository.save( insertList );        	
        })
        .matchAny(o -> log.info("received unknown message - {}", o.getClass().getName()  ))
        .build();
    }
}

DB에 Row를 저장하기위해 건바이건마다 트랜잭션이 발생하는 방식이아닌

JPA를 활용하여 리스트를 그대로 저장할수 있는 인터페이스와 연동되었습니다.

JPA(ORM)는 DB를 데이터중심에서 메시지중심방식으로 사용하기위한 유용한 DB개발방식입니다.

추가참조: 02-DBHANDLE with JPA

활용코드

	protected void fsmDBWriteTest(ActorSystem system,SpringExtension ext) {
	    new TestKit(system) {
		{
	        final ActorRef buncher =
	          system.actorOf(ext.props("buncher"));
	        
	        final ActorRef dbWriter =
	  	          system.actorOf(ext.props("DBWriteActor"));
	        
	        final ActorRef probe = getRef();
	        
	        buncher.tell(new SetTarget(dbWriter), dbWriter);
	        
			for(int i=0;i<200;i++) {				
				String buyTime = String.format("2018-%02d-%02d", i%10+1,i%20+1);			
				UserInfo buyUser = userInfoRepository.findAll().get(i%50);
				GameItem buyItem = gameItemRepository.findAll().get(i%50);			
				ItemBuyLog addBuyLog = new ItemBuyLog(buyTime, buyItem, buyUser);				
				buncher.tell(new Queue(addBuyLog), dbWriter);
			}
			
			buncher.tell(Flush.Flush , dbWriter);
			
			 system.stop(buncher);
	      }};
	}

원하는 처리량부분에 Flush를 수행하면 현재까지 수집된 Row를 List화하여 DB에 저장시도를 할수가 있습니다.

지정된 특정시간이 지나면, 수집된량에 상관없이 Flush를 자동 수행하기때문에 유입된 메시지를 모두 처리할수가 있습니다.




  • No labels
Write a comment…