SpringBoot와 AKKA의 만남

SpringBoot VS Playframework  대결구도도 상당히 흥미롭지만

Akka및 Spark와같은 분석처리를 위해 Scala 사용이 웹요소에서 더 중요하다고 하면

애시당초 탑재되어 같이 작동하는 Playframework를 추천을 합니다.

반대로 SpringBoot의 구성 사용요소가 훨씬많으며...,  Spring에서 액터모델을 대체할만한

요소를 찾지못했을시 Akka의 Actor및 Stream을 부분적으로 활용할때 권장되는 방식입니다.

스프링부트와 아카상호 운영하기

자바- 웹 프레임웍 진영에 있는 Spring Boot에 AKKA를 내장시키는 방법을 알아보겠습니다.

AKKA 준비과정은 이것보다 훨씬더 간단하지만 SpringBoot에 AKKA라는 외부툴킷을 내재시키고

상호운영하기위해서는 SpringBoot에서 제안하는 표준적인 방법으로 확장시켜야하며 약간의 수고가 필요합니다.

Sprint Boot 을 사용하는 방법은 AKKA주제와 별개로 Spring-BOOT 정리되었습니다.


Spring Boot 1.5.9 와 Akka 2.4.8 버젼에서 테스트 되었으나

문서작성 기간중에, 버젼이 업데이트 될수있습니다.(몇가지 유용한 Akka 유틸툴들이 상위버젼에서 추가됨)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="" xmlns:xsi=""


<description>Demo project for Spring Boot</description>

<relativePath/> <!-- lookup parent from repository -->



.....Boot Start 생략...






Application Layout

  • 1 - Spring Boot의 진입점과 액터를 사용하는 샘플코드가 있습니다.
  • 2 - 개발자가 설계한 각종 액터들을 집합을 시킬것입니다. 여기서 TestActor는 단순하게 받은 메시지 출력을 합니다.
  • 3 - AkkaSystem의 초기화된 싱글톤 객체를 얻기위해서 SpringApplication에 빈을 등록 해야하며, Spring이 Akka의 라이프사이클을 관리해주게됩니다.
  • 4 -  Akka에서는 ActorOf라는 객체로 액터를 등록하며, Spring에서는 객체를 Bean으로  관리합니다. 이 두가지를 동일한 인터페이스로 작동하게하는 유틸클래스입니다.
  • 5 - 로그환경 통합및 Spring( 의 외부설정과 Akka(application.confg) 의 외부설정을 할수 있는곳입니다.  

Step1 - Akka 사용 준비하기 - (3,4번)

한번만 설정을 하면 이후 거의 변경될일이 없습니다.

Spring Boot에 기본으로 포함되지 않는 외부 오픈라이브러리를  Boot와 연동하는 주제이며

SpringBoot의 기본 구조를 익히는데 도움이 됩니다. 

package com.psmon.cachedb.extension;

import org.springframework.context.ApplicationContext;

 * An actor producer that lets Spring create the Actor instances.
public class SpringActorProducer implements IndirectActorProducer {

    private final ApplicationContext applicationContext;
    private final String actorBeanName;
    private final Object[] args;

    public SpringActorProducer(ApplicationContext applicationContext,
        String actorBeanName) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
        this.args = null;
    public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object... args) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
        this.args = args;

    public Actor produce() {
        return args == null ? 
              (Actor) applicationContext.getBean(actorBeanName):
              (Actor) applicationContext.getBean(actorBeanName, args);

    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
package com.psmon.cachedb.extension;

import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

 * Extension to tell Akka how to create beans via Spring.
public class SpringExtension implements Extension {

    private ApplicationContext applicationContext;

     * Used to initialize the Spring application context for the extension.
    public void initialize(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;

     * Create a Props for the specified actorBeanName using the
     * SpringActorProducer class.
    public Props props(String actorBeanName, Object... args) {
        return (args != null && args.length > 0) ?
                     actorBeanName, args) :
package com.psmon.cachedb.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import com.psmon.cachedb.extension.SpringExtension;


@ComponentScan(basePackages = { "",
    "com.psmon.cachedb.actors", "com.psmon.cachedb.extension" })
public class ApplicationConfiguration {

    private ApplicationContext applicationContext;

    private SpringExtension springExtension;

	private Environment env;

    public ActorSystem actorSystem() {

        ActorSystem system = ActorSystem
            .create("AkkaTestApp", akkaConfiguration());
        return system;

	public Config akkaConfiguration() {

		String akkaip = env.getProperty("akka.remote.netty.tcp.ip");
		String port = env.getProperty("akka.remote.netty.tcp.port");

        // 기존 Akka설정과(application.conf) + 최종계산된 Spring의 설정(를 머징시키는방법
        // AKKA에서 필요한 기능들 설정은 충분히한후, 변경요소를 Spring설정을 노출하여 변경하는 방식
		return ConfigFactory.load().withValue("akka.remote.netty.tcp.ip", ConfigValueFactory.fromAnyRef(akkaip))
				.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port));


	public void doSomethingAfterStartup() {

Step2 - 로그환경 준비하기 ( 5 )

동시성 메시지 처리는 로그파악이 중요함으로, 꼭 설정을한후 사용을 합니다.

그래야 초반부터 System.out.print(...) 와같은 코드를 사용하지 않고 로그기반으로

Trace를 하는 습관을 가지게 됩니다. 여기서는 logback 기본 콘솔모드 설정을 사용하였으며

logback은 다양한 방법의 로그방식을 추가할수 있는 유용한 로그 라이브러리 입니다.


akka {

  loggers = ["akka.event.Logging$DefaultLogger"]

  loglevel = "INFO"

  stdout-loglevel = "ERROR"



  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
      <pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>

  <root level="INFO">
    <appender-ref ref="STDOUT" />


Step2.1 - 로그변경(Slf4jLogger)

# application.conf
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  log-config-on-start = on

# pom.xml

# logback-spring.xml
<logger name="" level="error" />
<logger name="akka" level="error" />

Step3. 액터설계하기

받은 메시지를 단순하게 출력을 하는 TestActor입니다.

OOP와 무슨차이냐구요?  메시지 기반 설계는 로컬에서만 사용시 복잡도가 증가할수는 있으나

액터로 설계된것은 스레드세이프하고 클러스터로 확장될수 있는 가능성을 열어줍니다. 

 액터의 특징 참고자료 : Actor

package com.psmon.cachedb.actors;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import akka.event.Logging;
import akka.event.LoggingAdapter;

@Scope("prototype")  //액터객체는 수십만개를 생성해도 가볍습니다. 힙영역에 원하는 만큼 생성해야함으로 싱글톤으로 작동되는것을 방지합니다.(스프링특성)
public class TestActor extends UntypedActor {
    private final LoggingAdapter log = Logging
            .getLogger(getContext().system(), "TestActor");

    public void onReceive(Object message) throws Exception {
    	if(message instanceof String) {"Incommessage {}", message);  
    	}else {"Unhandle Message {}", message);    		

Step.4 액터사용하기

package com.psmon.cachedb;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

import com.psmon.cachedb.extension.SpringExtension;

import akka.event.Logging;
import akka.event.LoggingAdapter;

public class CachedbApplication {

	public static void main(String[] args) {
		ApplicationContext context =, args);
        ActorSystem system = context.getBean(ActorSystem.class);

        final LoggingAdapter log = Logging.getLogger(system, "Application");"Starting up");

        SpringExtension ext = context.getBean(SpringExtension.class);
        ActorRef testActor = system.actorOf(ext.props("testActor"));
        testActor.tell("ready spring boot",null);        
  final Materializer materializer = ActorMaterializer.create(system);
  final ActorRef throttler =
    Source.actorRef(1000, OverflowStrategy.dropNew())
      .throttle(1,  FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping())
      .to(Sink.actorRef(testActor, NotUsed.getInstance() ))
  for(int i=0;i<100;i++) {
  	throttler.tell(String.format("fasmsg to slow %d", i), ActorRef.noSender());       	

100개의 메시지가 한꺼번에 들어왔다고 가정해보자....API가 되었건, DB가 되었건 ...

보통 동시간에 100개의 요청이 히트가 되어 불안정을 야기한다고 하면 이것을 안정적으로 공급할 장치가 필요합니다.

초당 처리제한 및 특정개수가 넘을시 억제방식등의 설계가 가능하며 밸브조절을 통한 역압력 장치라고도 불리며

실제 액체의 유입량을 제어하는 장치로부터 아이디어를 얻어 구현된 기능입니다.

AKKA는 액터모델로 시작고, AKKAStream으로 마무리를 하는것같습니다.

GraphDSL은 비동기 동시성처리에 있어서 멋진기능들을 제공해줍니다.


스프링 Context를 가지고 올수 있는곳 어느곳이나  AKKA사용을 위한 ActorSystem을 획득할수 있습니다.

ActorSystem system = context.getBean(ActorSystem.class);

객체명(TestActor = testActor(카멜) )을 통해 우리가 설계한 액터를 생성할수가 있습니다.

ActorRef testActor = system.actorOf(ext.props("testActor"));

//ActorRef(액터참조)는 로컬 어디든 

액터는 Enpoint 주소 처럼 만들고 접근할수 있습니다. 우리의 로컬로직을 리모트로 확장가능하며 이것은 Rest보다 수백배 빠를수 있습니다. 

   //액터 생성... 
  ActorRef testActor = system.actorOf(ext.props("testActor"),"service1");
  //로컬 주소를 통한 액터 선택
  ActorSelection testActor2 = system.actorSelection("user/service1");
  //원격 주소를 통한 액터 선택
  ActorSelection testActorRemote = system.actorSelection("akka.tcp://app@localhost:2552/user/service1");
  //참조를 통한 전송 ( 로컬 객체관리에 유리 -메모리 전송 )
  testActor.tell("ready spring boot",null);
  //액터 셀렉션을 통한 전송
  testActor2.tell("ready spring boot -again", null);
  //리모트를 통한 전송 ( 네트워크 전송 )
  testActorRemote.tell("ready spring boot -again too", null);

리모트로 변환하기위해서, 몇가지 설정만 하면됩니다. 

이것은 TCP Server를 만들어서 전송기능을 만드는것보다 훨씬 안정적이고 쉬우며

RestAPI를 만들어 전송을 시키는것보다도 구현코드는 더 간단하며 성능은 훨씬 좋습니다.

KAFKA와 같이 추가적인 관리포인트를 주지않고 이미 운영중인 어플리케이션에 전송기능 탑재가 용이합니다.

하지만 더 중요한것은  거의 모든 스트리머(카프카,래빗MQ,엘라스틱서치,기타 수많은 메시지큐시스템)와

일괄적인 방법으로 상호운영이 용이해집니다.

라운드 로빈같은 특정 라우티까지, 내장시킬수 있는것은 보너스입니다.

참고 :  

액터의 참조명을 통해, 블락킹이 없는 메시지 처리가 가능합니다.

testActor.tell("ready spring boot",null);


[INFO] [03/25/2018 20:24:00.055] [main] [Application] Starting up
[INFO] [03/25/2018 20:24:00.068] [] [TestActor] Incommessage ready spring boot

실행후 위 로그를 확인하였다면.. Spring과 Akka의 연합작전이 성공한것입니다.


웹 캐싱활용

캐싱은 웹기술뿐만 아니라, 디스크/메모리가 동시에 탑재된 여러가지 컴퓨터장치에서

성능을 위해 활용되는 기술입니다. 디스크는 상대적으로 속도가 느리기 때문에

반복적으로 발생하는 변경없는 데이터의 Read에대해 , 메모리가 일정시간 저장하고 있다가

디스크또는 네트워크에 부하를 주지않고 빨리 반환하는 기술이라고 볼수 있으며 응답이 중요한 웹에서도 중요한 요소입니다.

캐시 기초개념

일반적 인 캐시처리 방식

Write-through with no-write-allocationWrite-back with write-allocation

참고 :

캐시기능자체는 복잡해보이지만, 모듈을 통한  캐시기능 사용자체는 간단합니다. 

더욱이 사용자는 이것이 캐시처리가 된것인지 아닌지? 모르게 은닉을 하는게 목적입니다.

사용자가 뭔가 정보갱신이 안되고 있다라고 의심을 한다고하면 캐싱기능을 검토해야할것입니다.

웹응답을 높이기위한 CACHE종류

  • 브라우져 레벨의 캐시기능 ( 동일 요청에 대해 일정시간 이내 요청없이 스토어된 데이터로 처리 )
  • Proxy,장비 캐시기능(Proxy,라우터등에서 지정된 시간이내 동일 요청에 대해 메모리 반환처리)
  • 웹 서비스(서버) 레벨에서의 캐시구현 ( 직접 구현)

여기서는 좀더 커스텀하고/디테일한 캐싱처리가 가능한 웹서비스에서 캐싱처리를

직접 구현하는 방법을 살펴보겠습니다.

웹서버 캐시처리방법

  • 자신의 노드에서만 메모리 캐시처리
  • Redis등 메모리DB 사용하여, 전체 분산노드에 캐시기능 확장

간단한 캐시기능 적용(자신의노드에서만)

활용할 캐시 라이브러리



Application에 @EnableCaching 지정
public class Application { 


package com.example.demo.data2;

import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Component;

public class SimpleUserRepository {	
	public String getByIsbn(String isbn) throws Exception {
	 long time = 3000L; 
	 return "Some String-" +isbn;

단일노드에서에 자체 캐시처리는 아주단순합니다.  

단지 캐시처리할 대상을 Cacheble 어노테이션으로 묶어주고, 네이밍을 하면됩니다. ( 네이밍에따라 다른 캐시정책 설정가능)

getByIsbn 함수는 기본적으로 IO를 발생하는 느린 함수다라고 가정했을시(3초 인위적 딜레이발생)

인자값에따라 아래와같이 시간이 소모되게 됩니다.

  • getByIsbn('1') - 3초걸림

  • getByIsbn('1') - 빠르게수행
  • getByIsbn('1') - 빠르게수행
  • getByIsbn('2') - 3초걸림
  • getByIsbn('2') - 빠르게수행
  • getByIsbn('1') - 빠르게수행

차이점은, 최초검색 옵션일때만 느리고  동일한 검색옵션이 재사용되는경우

이미 캐싱되어 반환할 값을 가지고 있기때문에 빠르게 메모리 반환을 하는것입니다.

여기서는 기본캐싱옵션을 사용하였지만, 캐싱 옵셔에따라 다양한 캐시전략을 수행할수 있습니다.

  • 동일요청 캐싱시간을 최대 몇초유지할지?
  • 전체 캐시데이터를 몇개까지 최대 보관할지?
  • 데이터 변경시 캐싱데이터를 무효화 시킬지?
  • 캐싱에 관여하는 인자값 키처리를 별도로 지정할지?

더 복잡한 캐시처리

SQL 하나를 호출하기 위한 인자값이 7개정도가 되며 , RESTAPI를 통해 서비스를 한다고

가정해봅시다. 다양한 변수가 생길수 있는 상황에서

DB를 덜 호출하기 위해서 기본캐시기능에서 몇가지 요소가 필요합니다.

  • 여러개의 인자값 조합을 캐시처리기능이 구분 처리 해야합니다.
  • 휘발을 시키는 시간조건 설정이 가능해야합니다. 예를들면 하루동안 변경되지 않는 데이터이면 하루동안 유지할수 있습니다.
  • 캐시타임동안 쌓이게될 데이터 총량을 예측을 할수 없기때문에, 최대 개수설정이 가능해야합니다. (메모리풀방지)

Caffeine 캐시준비

spring.cache.cache-names: instruments, directory
spring.cache.caffeine.spec: maximumSize=1000, expireAfterAccess=10s   // 초(s) 분(m) 일(d)

# pom.xml

<!-- caching provider -->

JPA Repository와 연동이되어 위 요구사항을 충족 시킬수 있는 캐시모듈입니다.

instruments, directory 두개의 캐시공간을 사용예정이란것을 정의한것이며

10초동안 각각 다른 데이터 종류인 1000개의 캐시를 유지하겠다란 설정입니다.

이 설정으로, 10초동안 동일요청에대해 빠른 응답처리가 가능해집니다.  


public class InputOpt {
	private int ecode;
	public int getEcode() {
		return hashCode();
    public int hashCode() {
		HashCodeBuilder builder = new HashCodeBuilder();
        return builder.toHashCode();

검색 인자값의 다양성에따라 각각 고유하다란것을 해시코드재정의를 통해 활용할수 있습니다.

HashCodeBuilder를 통해 간결하게 작성이 가능합니다.  ecode는 단지 유니크하다라는 식별값이며

이 식별자는 캐시를 할지 말지 판단하는 중요한 판별기능에 요소로 사용이됩니다.

캐시 서비스 정의

//인터페이스 정의
public interface GoodsDataService {	
	Iterable<GoodsDataRC> findAll( BooleanBuilder searchOpt , Pageable pageRequest , InputOpt inputopt );	

----- 파일분리
//구 현
@CacheConfig(cacheNames = {"directory", "instruments"})
public class GoodsDataServiceImpl implements GoodsDataService{
	static final Logger logger = LoggerFactory.getLogger(GoodsDataService.class);	
	private GoodsDataRCRepo	goodsDataRCRepo; // 이것은 읽기전용의 JPA Repository
    @Cacheable( value="instruments", key="#inputopt.ecode" )
    public Iterable<GoodsDataRC> findAll( BooleanBuilder searchOpt , Pageable pageRequest , InputOpt inputopt ) { "hash == %d",inputopt.hashCode() );   	
        return goodsDataRCRepo.findAll(searchOpt,pageRequest);

DB를 탐색하는 findAll에 캐시기능을 부여하였습니다. 우리가 지정한 instruments 이름의 캐시공간에

우리가 정의한 inputopt.ecode 를통해, 중복처리에대해 캐시 처리가 될것입니다.  

캐시 DB사용하기

	GoodsDataService service;		//for cache db
	Iterable<GoodsDataRC> dbResult = service.findAll(searchOpt,pageRequest,inputOpt);

캐시를 사용하기위해 준비해야할게 몇가지 있었지만, 사용하는 코드쪽에서 

기존 JPA함수를 사용하여 DB를 직접호출하는 코드와 크게 다를것이 없어보입니다.

단지  Api 옵션을 판별하기 위해 우리가 정의한 inputOpt를 다시 전달했을뿐입니다.

이것을 어떻게 확인하느냐?  JPA호출시 sql문을 보여주는 옵션을 켜고 테스트를 하면

수많은 중복요청에대해 특정시간내에 단한번만 sql문을 호출할것입니다.

참고: EndPoint에 캐시설정

캐시기능을 여러노드에 공통적으로 확장하기

동일한 DB요청을 각각 다른 사용자가 동일시간(+-5초)에 했다고 가정해봅시다.

  • A사용자 최근 게시물 100개 조회 ( 0번노드)
  • B사용자 최근 게시물 100개 조회 ( 1번노드)
  • C사용자 최근 게시물 100개 조회 ( 2번노드)

웹서비스에는 노드밸런스란 기능이있어서 사용자를 각각 다른노드로 분배를시킵니다.

이것은 대부분의 웹서비스가 사용하는 방식입니다.

이와 같은 상황에서 문제가 무엇일까요? 각 노드에 DB호출수를 줄이기위해

각각의 서버에 캐시기능을 적용하였지만, 캐시기능은 아무런 쓸모없이

설정한 캐시시간을 무시하고, 동일한 데이터 조회를 연속으로 하게됩니다.

이때 필요한 캐시기능이, 여러노드에서도 중복호출을 막는 클러스터 캐시입니다.

형태소 분석기 웹소켓 서비스

플랫폼 제약없이, 한글형태소 분석기를 웹소켓으로 미들웨어에서 실시간으로 사용하고자 할때

내가 구현못하는 고수준의 엔진을통해 그것에서 나온 결과를 웹또는 어떠한곳에 실시간 제공하고자 할때 활용할수 있습니다.

풀텍스트 인덱싱단계에서 사용하여 올바른 검색이되게 하거나... 자연어의 검색시도를 분해하여 사용자의 의도를 파악할때 사용될수가 있습니다.

웹으로 이용하기:

웹소켓인터페이스로 이용하기: ws://

웹소켓 메시지 설계 대응 자바용 웹소켓


웹소켓을 로드밸런싱 :

Full소스위치 :  

올인원 스마트 배치

도메인 목표는없고, 설계상의 기능목표만 있음

기능 목표:

  • 다양한 잡(배치/실시간/크롤)에의한 데이터 수집및 집계처리
  • 분산 데이터 동일성유지
  • 실시간 반영 가능구조
  • 저장소의 저렴한 분산화를 통한 대량 서비스


  • 복잡도에비해 성능보장안될수도 있음
  • 분산된 저장소의 일관성 유지가 어려움

연동예정 Spec

  • Spring Batch
  • JPA Curd Repository
  • H2 Local DB
  • Akka Cluster Actor
  • Kafka/Spark/Hadoob 등

개발 예정 Link:

BigData with RealTime