네트워크로의 데이터 주입 속도의 상한을 정해 제어하고 네트워크에서 트래픽 체증을 일정하게 유지한다. 일정한 유출 속도(유출 속도는 고정된 값)를 제한하여 버스트 유입 속도를 부드럽게 한다.

  • 고정 용량의 버킷에 다양한 유량의 물이 들어오면 버킷에 담기고 그 담긴물은 일정량 비율로 떨어진다.
  • 들어오는 물의 양이 많아 버킷의 용량을 초과하게 되면 그 물은 버린다.
  • 입력 속도가 출력 속도보다 크면 버킷에서 누적이 발생하고 누적이 버킷 용량보다 큰 경우 오버플로가 발생하여 데이터 패킷 손실이 발생할 수 있다.



구현 - SCALA

import scala.concurrent.Future
import akka.actor._
import scala.collection.immutable.{ Queue ⇒ Q }
import scala.concurrent.duration._
import scala.util.{Success, Failure}
 
// A simple actor that prints whatever it receives
class PrintActor extends Actor {
  def receive = {
    case x ⇒ println(x)
  }
}


object LeakyBucketThrottler {
  case object Tick
  
  // Messages, as we queue them to be sent later
  final case class Message(message: Any, sender: ActorRef)
  
  final case class SetTarget(optTarget: Option[ActorRef])
}

/**
  * A throttling actor implementing the Leaky Bucket algorithm
  * @param restorePeriod the rate at which request quota increases over time, up to the maximum request quota 
  * @param burstRate maximum request quota, how many requests are allowed at once
  */
class LeakyBucketThrottler(var restorePeriod: FiniteDuration, var burstRate: Int) extends ActorWithTimer {
  import LeakyBucketThrottler._
  
  var requestQuota = burstRate //keeps track of current request quota
  var optTarget: Option[ActorRef] = None
  var messageQueue: Q[Message] = Q()
  private val restoreTimer = "restoreTimer"
  
  def receive: Receive = {
	case _: Tick.type => 
	  if (!messageQueue.isEmpty && optTarget.isDefined) {
		val (msg, newQ) = messageQueue.dequeue
		optTarget.get.tell(msg.message, msg.sender)
		messageQueue = newQ
	  } else if (requestQuota < burstRate) {
		requestQuota += 1
	  } else {
		log.debug("Quota fully restored!")
		cancelTimer(restoreTimer)
	  }
	case SetTarget(newOptTarget) => optTarget = newOptTarget
	case msg if optTarget.isDefined =>
	  if (requestQuota > 0) {
		optTarget.get.tell(msg, context.sender)
		requestQuota -= 1
		if (requestQuota == 0) setTimer(restoreTimer, Tick, restorePeriod, true)
	  } else {
		messageQueue = messageQueue enqueue Message(msg, context.sender)
	  }
	case msg => messageQueue = messageQueue enqueue Message(msg, context.sender)
  }
}
 
    

object Throttler extends App {
  import LeakyBucketThrottler._
  
  val system = ActorSystem()
  //implicit val execCtx = system.dispatcher
  
  val printer = system.actorOf(Props[PrintActor])
  // The throttler for this example, setting the rate
  val throttler = system.actorOf(Props(classOf[LeakyBucketThrottler], 3.seconds, 3))
  // Set the target
  throttler ! SetTarget(Some(printer))
  // These three messages will be sent to the target immediately
  throttler ! "1"
  throttler ! "2"
  println("waiting for 3 seconds to avoid throttling")
  Thread.sleep(3000)
  throttler ! "3"
  throttler ! "4"
  // These two will be throttled
  throttler ! "5"
  throttler ! "6"
}



BackPressure

유체(트래픽) 흐름을 제어하는 더 좋은 방식

링크



참고링크




  • No labels