Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

스프링부트와 아카상호 운영하기

Info

자바웹 자바- 웹 프레임웍 진영에 있는 Spring Boot에 AKKA를 내장시키는 방법을 알아보겠습니다.

AKKA 준비과정은 이것보다 훨씬더 간단하지만 SpringBoot에 AKKA라는 외부툴킷을 내재시키고

상호운영하기위해서는 SpringBoot에서 제안하는 표준적인 방법으로 확장시켜야하며 약간의 수고가 필요합니다.

Sprint Boot 을 사용하는 방법은 AKKA주제와 별개로 Spring-BOOT 정리되었습니다.


메이븐설정

Spring Boot 1.5.9 와 Akka 2.4.8 버젼에서 테스트 되었습니다.되었으나

문서작성 기간중에, 버젼이 업데이트 될수있습니다.(몇가지 유용한 Akka 유틸툴들이 상위버젼에서 추가됨)

Expand
titlepom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.psmon</groupId>
<artifactId>cachedb</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>cachedb</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<akka.version>2.4.8</akka.version>
</properties>

<dependencies>

.....Boot Start 생략...

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_2.11</artifactId>
<version>0.19</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>${akka.version}</version>
</dependency>

</dependencies>


...

  • 1 - Spring Boot의 진입점과 액터를 사용하는 샘플코드가 있습니다.
  • 2 - 개발자가 설계한 각종 액터들을 집합을 시킬것입니다. 여기서 TestActor는 단순하게 받은 메시지 출력을 합니다.
  • 3 - AkkaSystem의 초기화된 싱글톤 객체를 얻기위해서 SpringApplication에 빈을 등록 해야하며, Spring이 Akka의 라이프사이클을 관리해주게됩니다.
  • 4 -  Akka에서는 ActorOf라는 객체로 액터를 등록하며, Spring에서는 객체를 Bean으로 객체 Bean으로  관리합니다. 이 두가지를 동일한 인터페이스로 작동하게하는 유틸클래스입니다.
  • 5 - 로그환경 통합및 Spring( application.properties) 의 외부설정과 Akka(application.confg) 의 외부설정을 할수 있는곳입니다.  

...

Code Block
languagejava
themeEmacs
titleextension.SpringActorProducer
package com.psmon.cachedb.extension;

import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;

/**
 * An actor producer that lets Spring create the Actor instances.
 */
public class SpringActorProducer implements IndirectActorProducer {

    private final ApplicationContext applicationContext;
    private final String actorBeanName;
    private final Object[] args;

    public SpringActorProducer(ApplicationContext applicationContext,
        String actorBeanName) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
      }

  this.args = @Overridenull;
    public}
  Actor produce() {
    public SpringActorProducer(ApplicationContext applicationContext, String  return (Actor) applicationContext.getBean(actorBeanName);
actorBeanName, Object... args) {
      }

  this.applicationContext = @OverrideapplicationContext;
     public Class<? extends Actor> actorClass() {this.actorBeanName = actorBeanName;
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName)this.args = args;
    }
}
Code Block
languagejava
themeEmacs
titleextenstion.SpringExtension
package com.psmon.cachedb.extension;

import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

/**
 * Extension to tell Akka how to create beans via Spring.
 */
@Component
public class SpringExtension implements Extension {

    private ApplicationContext applicationContext;

    /**

    @Override
    public Actor produce() {
        return args == null ? 
              (Actor) applicationContext.getBean(actorBeanName):
     * Used to initialize the Spring application context for the(Actor) extension.
 applicationContext.getBean(actorBeanName, args);
    }

    */@Override
    public void initialize(ApplicationContext applicationContext)Class<? extends Actor> actorClass() {
        return  this.applicationContext = applicationContext(Class<? extends Actor>) applicationContext.getType(actorBeanName);
    }

    /**
     * Create a Props for the specified actorBeanName using the
     * SpringActorProducer class.
     */
    public Props props(String}


Code Block
languagejava
themeEmacs
titleextenstion.SpringExtension
package com.psmon.cachedb.extension;

import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

/**
 * Extension to tell Akka how to create beans via Spring.
 */
@Component
public class SpringExtension implements Extension {

    private ApplicationContext applicationContext;

    /**
     * Used to initialize the Spring application context for the extension. actorBeanName) {
        return Props.create(SpringActorProducer.class,
     */
    public void initialize(ApplicationContext applicationContext, actorBeanName);
  {
        }
}
Code Block
languagejava
themeEmacs
titleconfig.ApplicationConfiguration
package com.psmon.cachedb.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import com.psmon.cachedb.extension.SpringExtension;

import akka.actor.ActorSystem;

@Configuration
@Lazy
@ComponentScan(basePackages = { "com.psmon.cachedb.services",
    "com.psmon.cachedb.actors", "com.psmon.cachedb.extension" })
public class ApplicationConfiguration {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private SpringExtension springExtension;


	@Autowired
	private Environment env;

    @Bean
    public ActorSystem actorSystem() {

this.applicationContext = applicationContext;
    }

    /**
     * Create a Props for the specified actorBeanName using the
     * SpringActorProducer class.
     */
    public Props props(String actorBeanName, Object... args) {
        return (args != null && args.length > 0) ?
             Props.create(SpringActorProducer.class,
                     applicationContext,
                    ActorSystem systemactorBeanName, =args) ActorSystem:
             Props.create("AkkaTestApp", akkaConfiguration());
SpringActorProducer.class,
           springExtension.initialize(applicationContext);          applicationContext,
          return system;
    }

	@Bean
	public Config akkaConfiguration() {

		String akkaip = env.getProperty("akka.remote.netty.tcp.ip" actorBeanName);
		String port = env.getProperty("akka.remote.netty.tcp.port");

        // 기존 Akka설정과(application.conf) + 최종계산된 Spring의 설정(appplication.properties)를 머징시키는방법
        // AKKA에서 필요한 기능들 설정은 충분히한후, 변경요소를 Spring설정을 노출하여 변경하는 방식
		return ConfigFactory.load().withValue("akka.remote.netty.tcp.ip", ConfigValueFactory.fromAnyRef(akkaip))
				.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port));

	}

	@EventListener(ApplicationReadyEvent.class)
	public void doSomethingAfterStartup() {
	}
	
}

Step2 - 로그환경 준비하기 ( 5 )

동시성 메시지 처리는 로그파악이 중요함으로, 꼭 설정을한후 사용을 합니다.

그래야 초반부터 System.out.print(...) 와같은 코드를 사용하지 않고 로그기반으로

Trace를 하는 습관을 가지게 됩니다. 여기서는 logback 기본 콘솔모드 설정을 사용하였으며

logback은 다양한 방법의 로그방식을 추가할수 있는 유용한 로그 라이브러리 입니다.

     }   
}


Code Block
languagejava
themeEmacs
titleconfig.ApplicationConfiguration
package com.psmon.cachedb.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.event.EventListener;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import com.psmon.cachedb.extension.SpringExtension;

import akka.actor.ActorSystem;

@Configuration
@Lazy
@ComponentScan(basePackages = { "com.psmon.cachedb.services",
    "com.psmon.cachedb.actors", "com.psmon.cachedb.extension" })
public class ApplicationConfiguration {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private SpringExtension springExtension;


	@Autowired
	private Environment env;

    @Bean
    public ActorSystem actorSystem() {

        ActorSystem system = ActorSystem
            .create("AkkaTestApp", akkaConfiguration());
        springExtension.initialize(applicationContext);        
        return system;
    }

	@Bean
	public Config akkaConfiguration() {

		String akkaip = env.getProperty("akka.remote.netty.tcp.ip");
		String port = env.getProperty("akka.remote.netty.tcp.port");

        // 기존 Akka설정과(application.conf) + 최종계산된 Spring의 설정(appplication.properties)를 머징시키는방법
        // AKKA에서 필요한 기능들 설정은 충분히한후, 변경요소를 Spring설정을 노출하여 변경하는 방식
		return ConfigFactory.load().withValue("akka.remote.netty.tcp.ip", ConfigValueFactory.fromAnyRef(akkaip))
				.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port));

	}

	@EventListener(ApplicationReadyEvent.class)
	public void doSomethingAfterStartup() {
	}
	
}


Step2 - 로그환경 준비하기 ( 5 )

동시성 메시지 처리는 로그파악이 중요함으로, 꼭 설정을한후 사용을 합니다.

그래야 초반부터 System.out.print(...) 와같은 코드를 사용하지 않고 로그기반으로

Trace를 하는 습관을 가지게 됩니다. 여기서는 logback 기본 콘솔모드 설정을 사용하였으며

logback은 다양한 방법의 로그방식을 추가할수 있는 유용한 로그 라이브러리 입니다.

Code Block
languagejava
themeEmacs
#application.propertis
logging.config=classpath:logback-spring.xml


#application.conf
akka {

  loggers = ["akka.event.Logging$DefaultLogger"]

  loglevel = "INFO"

  stdout-loglevel = "ERROR"

}


#logback-spring.xml
<configuration>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="INFO">
    <appender-ref ref="STDOUT" />
  </root>

</configuration>



Step2.1 - 로그변경(Slf4jLogger)

Code Block
languagejava
themeEmacs
# application.conf
akka {
No Format
#application.propertis
logging.config=classpath:logback-spring.xml


#application.conf
akka {

  loggers = ["akka.event.slf4j.Logging$DefaultLoggerSlf4jLogger"]

  loglevel = "INFODEBUG"

  stdout-loglevellog-config-on-start = "ERROR"
on
}


#logback-spring# pom.xml
<configuration>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="INFO">
    <appender-ref ref="STDOUT" />
  </root>

</configuration><dependency>
	<groupId>com.typesafe.akka</groupId>
	<artifactId>akka-slf4j_2.12</artifactId>
	<version>${akka.version}</version>
</dependency>	


# logback-spring.xml
<logger name="org.springframework.data" level="error" />
<logger name="akka" level="error" />



Step3. 액터설계하기

받은 메시지를 단순하게 출력을 하는 TestActor입니다.

...

Info


Code Block
languagejava
themeEmacs
title보너스코드
  final Materializer materializer = ActorMaterializer.create(system);
  
  final ActorRef throttler =
    Source.actorRef(1000, OverflowStrategy.dropNew())
      .throttle(1,  FiniteDuration.create(1, TimeUnit.SECONDS), 10, ThrottleMode.shaping())
      .to(Sink.actorRef(testActor, NotUsed.getInstance() ))
      .run(materializer);
  	        
  for(int i=0;i<100;i++) {
  	throttler.tell(String.format("fasmsg to slow %d", i), ActorRef.noSender());       	
  }

100개의 메시지가 한꺼번에 들어왔다고 가정해보자....API가 되었건, DB가 되었건 ...

보통 동시간에 100개의 요청이 히트가 되어 불안정을 야기한다고 하면 이것을 안정적으로 공급할 장치가필요합니다장치가 필요합니다.

초당 처리제한 및 특정개수가 넘을시 억제방식등의 설계가 가능하며 밸브조절을 통한 역압력 장치라고도 불리며

실제 액체의 유입량을 제어하는 장치로부터 아이디어를 얻어 구현된 기능입니다.

AKKA는 액터모델로 시작고, AKKAStream으로 마무리를 하는것같습니다.

GraphDSL은 비동기 동시성처리에 있어서 멋진기능들을 제공해줍니다.

https://doc.akka.io/docs/akka/2.5.4/scala/stream/stream-graphs.html

...

실행후 위 로그를 확인하였다면.. Spring과 Akka의 연합작전이 성공한것입니다.


참조: