스프링부트와 아카상호 운영하기

자바- 웹 프레임웍 진영에 있는 Spring Boot에 AKKA를 내장시키는 방법을 알아보겠습니다.

AKKA 준비과정은 이것보다 훨씬더 간단하지만 SpringBoot에 AKKA라는 외부툴킷을 내재시키고

상호운영하기위해서는 SpringBoot에서 제안하는 표준적인 방법으로 확장시켜야하며 약간의 수고가 필요합니다.

Sprint Boot 을 사용하는 방법은 AKKA주제와 별개로 Spring-BOOT 정리되었습니다.


메이븐설정

Spring Boot 1.5.9 와 Akka 2.4.8 버젼에서 테스트 되었으나

문서작성 기간중에, 버젼이 업데이트 될수있습니다.(몇가지 유용한 Akka 유틸툴들이 상위버젼에서 추가됨)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.psmon</groupId>
<artifactId>cachedb</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>cachedb</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<akka.version>2.4.8</akka.version>
</properties>

<dependencies>

.....Boot Start 생략...

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_2.11</artifactId>
<version>0.19</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>${akka.version}</version>
</dependency>

</dependencies>



Application Layout


  • 1 - Spring Boot의 진입점과 액터를 사용하는 샘플코드가 있습니다.
  • 2 - 개발자가 설계한 각종 액터들을 집합을 시킬것입니다. 여기서 TestActor는 단순하게 받은 메시지 출력을 합니다.
  • 3 - AkkaSystem의 초기화된 싱글톤 객체를 얻기위해서 SpringApplication에 빈을 등록 해야하며, Spring이 Akka의 라이프사이클을 관리해주게됩니다.
  • 4 -  Akka에서는 ActorOf라는 객체로 액터를 등록하며, Spring에서는 객체를 Bean으로  관리합니다. 이 두가지를 동일한 인터페이스로 작동하게하는 유틸클래스입니다.
  • 5 - 로그환경 통합및 Spring( application.properties) 의 외부설정과 Akka(application.confg) 의 외부설정을 할수 있는곳입니다.  


Step1 - Akka 사용 준비하기 - (3,4번)

한번만 설정을 하면 이후 거의 변경될일이 없습니다.

Spring Boot에 기본으로 포함되지 않는 외부 오픈라이브러리를  Boot와 연동하는 주제이며

SpringBoot의 기본 구조를 익히는데 도움이 됩니다. 

extension.SpringActorProducer
package com.psmon.cachedb.extension;

import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;

/**
 * An actor producer that lets Spring create the Actor instances.
 */
public class SpringActorProducer implements IndirectActorProducer {

    private final ApplicationContext applicationContext;
    private final String actorBeanName;
    private final Object[] args;

    public SpringActorProducer(ApplicationContext applicationContext,
        String actorBeanName) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
        this.args = null;
    }
    
    public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object... args) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
        this.args = args;
    }

    @Override
    public Actor produce() {
        return args == null ? 
              (Actor) applicationContext.getBean(actorBeanName):
              (Actor) applicationContext.getBean(actorBeanName, args);
    }

    @Override
    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
    }
}
extenstion.SpringExtension
package com.psmon.cachedb.extension;

import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

/**
 * Extension to tell Akka how to create beans via Spring.
 */
@Component
public class SpringExtension implements Extension {

    private ApplicationContext applicationContext;

    /**
     * Used to initialize the Spring application context for the extension.
     */
    public void initialize(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    /**
     * Create a Props for the specified actorBeanName using the
     * SpringActorProducer class.
     */
    public Props props(String actorBeanName, Object... args) {
        return (args != null && args.length > 0) ?
             Props.create(SpringActorProducer.class,
                     applicationContext,
                     actorBeanName, args) :
             Props.create(SpringActorProducer.class,
                     applicationContext,
                     actorBeanName);
     }   
}
config.ApplicationConfiguration
package com.psmon.cachedb.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import com.psmon.cachedb.extension.SpringExtension;

import akka.actor.ActorSystem;

@Configuration
@Lazy
@ComponentScan(basePackages = { "com.psmon.cachedb.services",
    "com.psmon.cachedb.actors", "com.psmon.cachedb.extension" })
public class ApplicationConfiguration {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private SpringExtension springExtension;


	@Autowired
	private Environment env;

    @Bean
    public ActorSystem actorSystem() {

        ActorSystem system = ActorSystem
            .create("AkkaTestApp", akkaConfiguration());
        springExtension.initialize(applicationContext);        
        return system;
    }

	@Bean
	public Config akkaConfiguration() {

		String akkaip = env.getProperty("akka.remote.netty.tcp.ip");
		String port = env.getProperty("akka.remote.netty.tcp.port");

        // 기존 Akka설정과(application.conf) + 최종계산된 Spring의 설정(appplication.properties)를 머징시키는방법
        // AKKA에서 필요한 기능들 설정은 충분히한후, 변경요소를 Spring설정을 노출하여 변경하는 방식
		return ConfigFactory.load().withValue("akka.remote.netty.tcp.ip", ConfigValueFactory.fromAnyRef(akkaip))
				.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port));

	}

	@EventListener(ApplicationReadyEvent.class)
	public void doSomethingAfterStartup() {
	}
	
}


Step2 - 로그환경 준비하기 ( 5 )

동시성 메시지 처리는 로그파악이 중요함으로, 꼭 설정을한후 사용을 합니다.

그래야 초반부터 System.out.print(...) 와같은 코드를 사용하지 않고 로그기반으로

Trace를 하는 습관을 가지게 됩니다. 여기서는 logback 기본 콘솔모드 설정을 사용하였으며

logback은 다양한 방법의 로그방식을 추가할수 있는 유용한 로그 라이브러리 입니다.

#application.propertis
logging.config=classpath:logback-spring.xml


#application.conf
akka {

  loggers = ["akka.event.Logging$DefaultLogger"]

  loglevel = "INFO"

  stdout-loglevel = "ERROR"

}


#logback-spring.xml
<configuration>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="INFO">
    <appender-ref ref="STDOUT" />
  </root>

</configuration>



Step2.1 - 로그변경(Slf4jLogger)

# application.conf
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  log-config-on-start = on
}


# pom.xml
<dependency>
	<groupId>com.typesafe.akka</groupId>
	<artifactId>akka-slf4j_2.12</artifactId>
	<version>${akka.version}</version>
</dependency>	


# logback-spring.xml
<logger name="org.springframework.data" level="error" />
<logger name="akka" level="error" />



Step3. 액터설계하기

받은 메시지를 단순하게 출력을 하는 TestActor입니다.

OOP와 무슨차이냐구요?  메시지 기반 설계는 로컬에서만 사용시 복잡도가 증가할수는 있으나

액터로 설계된것은 스레드세이프하고 클러스터로 확장될수 있는 가능성을 열어줍니다. 

 액터의 특징 참고자료 : Actor

package com.psmon.cachedb.actors;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

@Component
@Scope("prototype")  //액터객체는 수십만개를 생성해도 가볍습니다. 힙영역에 원하는 만큼 생성해야함으로 싱글톤으로 작동되는것을 방지합니다.(스프링특성)
public class TestActor extends UntypedActor {
    private final LoggingAdapter log = Logging
            .getLogger(getContext().system(), "TestActor");

    @Override
    public void onReceive(Object message) throws Exception {
    	
    	if(message instanceof String) {
    		log.info("Incommessage {}", message);  
    		
    	}else {
    		log.info("Unhandle Message {}", message);    		
    	}    	
    }
}


Step.4 액터사용하기

package com.psmon.cachedb;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;

import com.psmon.cachedb.extension.SpringExtension;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.event.Logging;
import akka.event.LoggingAdapter;

@SpringBootApplication
public class CachedbApplication {

	public static void main(String[] args) {
		ApplicationContext context = SpringApplication.run(CachedbApplication.class, args);
		
        ActorSystem system = context.getBean(ActorSystem.class);

        final LoggingAdapter log = Logging.getLogger(system, "Application");

        log.info("Starting up");

        SpringExtension ext = context.getBean(SpringExtension.class);
                
        ActorRef testActor = system.actorOf(ext.props("testActor"));
        
        testActor.tell("ready spring boot",null);        
	}
}
보너스코드
  final Materializer materializer = ActorMaterializer.create(system);
  
  final ActorRef throttler =
    Source.actorRef(1000, OverflowStrategy.dropNew())
      .throttle(1,  FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping())
      .to(Sink.actorRef(testActor, NotUsed.getInstance() ))
      .run(materializer);
  	        
  for(int i=0;i<100;i++) {
  	throttler.tell(String.format("fasmsg to slow %d", i), ActorRef.noSender());       	
  }

100개의 메시지가 한꺼번에 들어왔다고 가정해보자....API가 되었건, DB가 되었건 ...

보통 동시간에 100개의 요청이 히트가 되어 불안정을 야기한다고 하면 이것을 안정적으로 공급할 장치가 필요합니다.

초당 처리제한 및 특정개수가 넘을시 억제방식등의 설계가 가능하며 밸브조절을 통한 역압력 장치라고도 불리며

실제 액체의 유입량을 제어하는 장치로부터 아이디어를 얻어 구현된 기능입니다.

AKKA는 액터모델로 시작고, AKKAStream으로 마무리를 하는것같습니다.

GraphDSL은 비동기 동시성처리에 있어서 멋진기능들을 제공해줍니다.

https://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-graphs.html


주요코드

스프링 Context를 가지고 올수 있는곳 어느곳이나  AKKA사용을 위한 ActorSystem을 획득할수 있습니다.

ActorSystem system = context.getBean(ActorSystem.class);

객체명(TestActor = testActor(카멜) )을 통해 우리가 설계한 액터를 생성할수가 있습니다.

ActorRef testActor = system.actorOf(ext.props("testActor"));


//ActorRef(액터참조)는 로컬 어디든 

액터는 Enpoint 주소 처럼 만들고 접근할수 있습니다. 우리의 로컬로직을 리모트로 확장가능하며 이것은 Rest보다 수백배 빠를수 있습니다. 

   //액터 생성... 
  ActorRef testActor = system.actorOf(ext.props("testActor"),"service1");
  
  //로컬 주소를 통한 액터 선택
  ActorSelection testActor2 = system.actorSelection("user/service1");
  
  //원격 주소를 통한 액터 선택
  ActorSelection testActorRemote = system.actorSelection("akka.tcp://app@localhost:2552/user/service1");
  
  //참조를 통한 전송 ( 로컬 객체관리에 유리 -메모리 전송 )
  testActor.tell("ready spring boot",null);
  
  //액터 셀렉션을 통한 전송
  testActor2.tell("ready spring boot -again", null);
  
  //리모트를 통한 전송 ( 네트워크 전송 )
  testActorRemote.tell("ready spring boot -again too", null);


리모트로 변환하기위해서, 몇가지 설정만 하면됩니다. 

이것은 TCP Server를 만들어서 전송기능을 만드는것보다 훨씬 안정적이고 쉬우며

RestAPI를 만들어 전송을 시키는것보다도 구현코드는 더 간단하며 성능은 훨씬 좋습니다.

KAFKA와 같이 추가적인 관리포인트를 주지않고 이미 운영중인 어플리케이션에 전송기능 탑재가 용이합니다.

하지만 더 중요한것은  거의 모든 스트리머(카프카,래빗MQ,엘라스틱서치,기타 수많은 메시지큐시스템)와

일괄적인 방법으로 상호운영이 용이해집니다.


라운드 로빈같은 특정 라우티까지, 내장시킬수 있는것은 보너스입니다.

참고 :  

액터의 참조명을 통해, 블락킹이 없는 메시지 처리가 가능합니다.

testActor.tell("ready spring boot",null);


작동로그확인

[INFO] [03/25/2018 20:24:00.055] [main] [Application] Starting up
[INFO] [03/25/2018 20:24:00.068] [AkkaTestApp-akka.actor.default-dispatcher-2] [TestActor] Incommessage ready spring boot

실행후 위 로그를 확인하였다면.. Spring과 Akka의 연합작전이 성공한것입니다.


참조:



Write a comment…