Page History
스프링부트와 아카상호 운영하기
Info |
---|
자바웹 자바- 웹 프레임웍 진영에 있는 Spring Boot에 AKKA를 내장시키는 방법을 알아보겠습니다. AKKA 준비과정은 이것보다 훨씬더 간단하지만 SpringBoot에 AKKA라는 외부툴킷을 내재시키고 상호운영하기위해서는 SpringBoot에서 제안하는 표준적인 방법으로 확장시켜야하며 약간의 수고가 필요합니다. Sprint Boot 을 사용하는 방법은 AKKA주제와 별개로 Spring-BOOT 정리되었습니다. |
메이븐설정
Spring Boot 1.5.9 와 Akka 2.4.8 버젼에서 테스트 되었습니다.되었으나
문서작성 기간중에, 버젼이 업데이트 될수있습니다.(몇가지 유용한 Akka 유틸툴들이 상위버젼에서 추가됨)
Expand | ||
---|---|---|
| ||
<?xml version="1.0" encoding="UTF-8"?> <groupId>com.psmon</groupId> <name>cachedb</name> <parent> <properties> <dependencies> .....Boot Start 생략... <dependency> </dependencies> |
Application Layout
- 1 - Spring Boot의 진입점과 액터를 사용하는 샘플코드가 있습니다.
- 2 - 개발자가 설계한 각종 액터들을 집합을 시킬것입니다. 여기서 TestActor는 단순하게 받은 메시지 출력을 합니다.
- 3 - AkkaSystem의 초기화된 싱글톤 객체를 얻기위해서 SpringApplication에 빈을 등록 해야하며, Spring이 Akka의 라이프사이클을 관리해주게됩니다.
- 4 - Akka에서는 ActorOf라는 객체로 액터를 등록하며, Spring에서는 객체를 Bean으로 객체 Bean으로 관리합니다. 이 두가지를 동일한 인터페이스로 작동하게하는 유틸클래스입니다.
- 5 - 로그환경 통합및 Spring( application.properties) 의 외부설정과 Akka(application.confg) 의 외부설정을 할수 있는곳입니다.
Step1 - Akka 사용 준비하기 - (3,4번)
한번만 설정을 하면 이후 거의 변경될일이 없습니다.
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package com.psmon.cachedb.extension; import akka.actor.Actor; import akka.actor.IndirectActorProducer; import org.springframework.context.ApplicationContext; /** * An actor producer that lets Spring create the Actor instances. */ public class SpringActorProducer implements IndirectActorProducer { private final ApplicationContext applicationContext; private final String actorBeanName; private final Object[] args; public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName) { this.applicationContext = applicationContext; this.actorBeanName = actorBeanName; } @Override this.args = null; public Actor produce() {} returnpublic SpringActorProducer(Actor)ApplicationContext applicationContext.getBean(actorBeanName); , String actorBeanName, Object... args) { } this.applicationContext = @OverrideapplicationContext; public Class<? extends Actor> actorClass() {this.actorBeanName = actorBeanName; return (Class<? extends Actor>) applicationContext.getType(actorBeanName)this.args = args; } } | ||||||
Code Block | ||||||
| ||||||
package com.psmon.cachedb.extension; import akka.actor.Extension; import akka.actor.Props; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; /** * Extension to tell Akka how to create beans via Spring. */ @Component public class SpringExtension implements Extension { private ApplicationContext applicationContext; /** @Override public Actor produce() { return args == null ? (Actor) applicationContext.getBean(actorBeanName): * Used to initialize the Spring application context for the(Actor) extension. applicationContext.getBean(actorBeanName, args); } */@Override public void initialize(ApplicationContext applicationContextClass<? extends Actor> actorClass() { return this.applicationContext = applicationContext(Class<? extends Actor>) applicationContext.getType(actorBeanName); } /** * Create a Props for the specified actorBeanName using the * SpringActorProducer class. */ public Props props(String } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package com.psmon.cachedb.extension; import akka.actor.Extension; import akka.actor.Props; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; /** * Extension to tell Akka how to create beans via Spring. */ @Component public class SpringExtension implements Extension { private ApplicationContext applicationContext; /** * Used to initialize the Spring application context for the extension.actorBeanName) { return Props.create(SpringActorProducer.class, */ public void initialize(ApplicationContext applicationContext, actorBeanName); { } }this.applicationContext = applicationContext; } /** * Create a Props for the specified actorBeanName using the * SpringActorProducer class. */ public Props props(String actorBeanName, Object... args) { return (args != null && args.length > 0) ? Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args) : Props.create(SpringActorProducer.class, applicationContext, actorBeanName); } } |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package com.psmon.cachedb.config;
import org.springframework.beans.factory.annotation.Autowired;
import org. | ||||||
Code Block | ||||||
| ||||||
package com.psmon.cachedb.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.psmon.cachedb.extension.SpringExtension;
import akka.actor.ActorSystem;
@Configuration
@Lazy
@ComponentScan(basePackages = { "com.psmon.cachedb.services",
"com.psmon.cachedb.actors", "com.psmon.cachedb.extension" })
public class ApplicationConfiguration {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private SpringExtension springExtension;
@Autowired
private Environment env;
@Bean
public ActorSystem actorSystem() {
ActorSystem system = ActorSystem
.create("AkkaTestApp", akkaConfiguration());
springExtension.initialize(applicationContext);
return system;
}
@Bean
public Config akkaConfiguration() {
String akkaip = env.getProperty("akka.remote.netty.tcp.ip");
String port = env.getProperty("akka.remote.netty.tcp.port");
// 기존 Akka설정과(application.conf) + 최종계산된 Spring의 설정(appplication.properties)를 머징시키는방법
// AKKA에서 필요한 기능들 설정은 충분히한후, 변경요소를 Spring설정을 노출하여 변경하는 방식
return ConfigFactory.load().withValue("akka.remote.netty.tcp.ip", ConfigValueFactory.fromAnyRef(akkaip))
.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port));
}
@EventListener(ApplicationReadyEvent.class)
public void doSomethingAfterStartup() {
}
}
|
Step2 - 로그환경 준비하기 ( 5 )
동시성 메시지 처리는 로그파악이 중요함으로, 꼭 설정을한후 사용을 합니다.
...
logback은 다양한 방법의 로그방식을 추가할수 있는 유용한 로그 라이브러리 입니다..
Code Block | ||||
---|---|---|---|---|
| ||||
#application.propertis
logging.config=classpath:logback-spring.xml
#application.conf
akka {
loggers = ["akka.event.Logging$DefaultLogger"]
loglevel = "INFO"
stdout-loglevel = "ERROR"
}
#logback-spring.xml
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration> |
Step2.1 - 로그변경(Slf4jLogger)
Code Block | ||||
---|---|---|---|---|
| ||||
# application.conf
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
log-config-on-start = on
}
# pom.xml
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
# logback-spring.xml
<logger name="org.springframework.data" level="error" />
<logger name="akka" level="error" /> | ||||
No Format | ||||
#application.propertis
logging.config=classpath:logback-spring.xml
#application.conf
akka {
loggers = ["akka.event.Logging$DefaultLogger"]
loglevel = "INFO"
stdout-loglevel = "ERROR"
}
#logback-spring.xml
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration> |
Step3. 액터설계하기
받은 메시지를 단순하게 출력을 하는 TestActor입니다.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package com.psmon.cachedb.actors; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; @Component @Scope("prototype") //액터객체는 수십만개를 생성해도 가볍습니다. 힙영역에 원하는 만큼 생성해야함으로 싱글톤으로 작동되는것을 방지합니다.(스프링특성) public class TestActor extends UntypedActor { private final LoggingAdapter log = Logging .getLogger(getContext().system(), "TestActor"); @Override public void onReceive(Object message) throws Exception { if(message instanceof String) { log.info("Incommessage {}", message); }else { log.info("Unhandle Message {}", message); } } } |
Step.4 액터사용하기
Code Block | ||||
---|---|---|---|---|
| ||||
package com.psmon.cachedb; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import com.psmon.cachedb.extension.SpringExtension; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.event.Logging; import akka.event.LoggingAdapter; @SpringBootApplication public class CachedbApplication { public static void main(String[] args) { ApplicationContext context = SpringApplication.run(CachedbApplication.class, args); ActorSystem system = context.getBean(ActorSystem.class); final LoggingAdapter log = Logging.getLogger(system, "Application"); log.info("Starting up"); SpringExtension ext = context.getBean(SpringExtension.class); ActorRef testActor = system.actorOf(ext.props("testActor")); testActor.tell("ready spring boot",null); } } |
...
Info | |||||||||
---|---|---|---|---|---|---|---|---|---|
100개의 메시지가 한꺼번에 들어왔다고 가정해보자....API가 되었건, DB가 되었건 ... 보통 동시간에 100개의 요청이 히트가 되어 불안정을 야기한다고 하면 이것을 안정적으로 공급할 장치가필요합니다. 초당 처리제한 및 특정개수가 넘을시 억제방식등의 설계가 가능하며 역압력 장치라고도 불리며 실제 액체의 유입량을 제어하는 장치로부터 아이디어를 받은 패턴입니다.공급할 장치가 필요합니다. 초당 처리제한 및 특정개수가 넘을시 억제방식등의 설계가 가능하며 밸브조절을 통한 역압력 장치라고도 불리며 실제 액체의 유입량을 제어하는 장치로부터 아이디어를 얻어 구현된 기능입니다.
|
주요코드
스프링 Context를 가지고 올수 있는곳 어느곳이나 AKKA사용을 위한 ActorSystem을 획득할수 있습니다.
No Format |
---|
ActorSystem system = context.getBean(ActorSystem.class); |
객체명(TestActor = testActor(카멜) )을 통해 우리가 설계한 액터를 생성할수가 있습니다.
No Format |
---|
ActorRef testActor = system.actorOf(ext.props("testActor")); //ActorRef(액터참조)는 로컬 어디든 |
액터는 Enpoint 주소 처럼 만들고 접근할수 있습니다. 우리의 로컬로직을 리모트로 확장가능하며 이것은 Rest보다 수백배 빠를수 있습니다.
No Format |
---|
//액터 생성... ActorRef testActor = system.actorOf(ext.props("testActor"),"service1"); //로컬 주소를 통한 액터 선택 ActorSelection testActor2 = system.actorSelection("user/service1"); //원격 주소를 통한 액터 선택 ActorSelection testActorRemote = system.actorSelection("akka.tcp://app@localhost:2552/user/service1"); //참조를 통한 전송 ( 로컬 객체관리에 유리 -메모리 전송 ) testActor.tell("ready spring boot",null); //액터 셀렉션을 통한 전송 testActor2.tell("ready spring boot -again", null); //리모트를 통한 전송 ( 네트워크 전송 ) testActorRemote.tell("ready spring boot -again too", null); |
...
리모트로 변환하기위해서, 몇가지 설정만 하면됩니다.
이것은 TCP Server를 만들어서 전송기능을 만드는것보다 훨씬 안정적이고 쉬우며
RestAPI를 만들어 전송을 시키는것보다도 구현코드는 더 간단하며 성능은 훨씬 좋습니다.
KAFKA와 같이 추가적인 관리포인트를 주지않고 이미 운영중인 어플리케이션에 전송기능 탑재가 용이합니다.
하지만 더 중요한것은 거의 모든 스트리머(카프카,래빗MQ,엘라스틱서치,기타 수많은 메시지큐시스템)와
일괄적인 방법으로 상호운영이 용이해집니다.
라운드 로빈같은 특정 라우티까지, 내장시킬수 있는것은 보너스입니다.
참고 :
- https://doc.akka.io/docs/akka/2.5/remoting.html ( 리모트 전송 )
- https://developer.lightbend.com/docs/alpakka/current/ ( 모든 스트리머와 상호연동하기)
- https://doc.akka.io/docs/akka-stream-kafka/current/home.html ( 카프카 모듈 )
- http://www.reactive-streams.org/ ( 서로다른 구현체의 연동 핵심은, reactive-streams에서 출발하였습니다.)
액터의 참조명을 통해, 블락킹이 없는 메시지 처리가 가능합니다.
No Format |
---|
testActor.tell("ready spring boot",null); |
작동로그확인
No Format |
---|
[INFO] [03/25/2018 20:24:00.055] [main] [Application] Starting up [INFO] [03/25/2018 20:24:00.068] [AkkaTestApp-akka.actor.default-dispatcher-2] [TestActor] Incommessage ready spring boot |
실행후 위 로그를 확인하였다면.. Spring과 Akka의 연합작전이 성공한것입니다.
참조:
- Akka+Spring : http://git.webnori.com/projects/WEBF/repos/spring_cachedb/commits/9a0b83d8a07cbe053dc20514546ac3fae05a8e23
- Akka Document : https://akka.io/
- Akka with Spring : https://www.baeldung.com/akka-with-spring