Blog
Akka Throttlerのチュートリアル
こんにちは。AMoAdでエンジニアをやっております、西部と申します。
今回は、AMoAdで外部APIを呼んだ時に便利だったAkkaのThrottlerのチュートリアル的な話をしたいと思います。
Akkaってなんぞ?という方は以下の記事をぜひ読んでみてください。
【Akka入門の入門】Part.1 メッセージを送る
【Akka入門の入門】Part.2 アクター子アクター孫アクター
外部API使用する場合、QPSの制限があるケースがありますよね。
この上限ギリギリまでアクセスしたいという場合にThrottlerは威力を発揮します。
簡単な例でご紹介しましょう。3QPSに抑えるケースです。
sbt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
version := "1.0" scalaVersion := "2.11.8" resolvers ++= Seq( "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/" ) libraryDependencies ++= Seq( "org.scalatest" %% "scalatest" % "2.2.6" % "test", "com.typesafe.akka" %% "akka-testkit" % "2.4.8" % "test", "com.typesafe.akka" %% "akka-actor" % "2.4.1", "com.typesafe.akka" %% "akka-contrib" % "2.4.1" ) |
code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import scala.concurrent.duration._ import akka.actor.{Actor, ActorSystem, Props} import akka.contrib.throttle.Throttler.{RateInt, SetTarget} import akka.contrib.throttle.TimerBasedThrottler object ThrottleTest1 extends App { val system = ActorSystem("ThrottleSystem") val printer = system.actorOf(Props[PrintActor]) val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3 msgsPer 1.second)) throttler ! SetTarget(Some(printer)) 1 to 5 foreach { throttler ! _ } } class PrintActor extends Actor { def receive = { case x ⇒ println(x) } } |
結果
1 2 3 4 5 6 |
1 2 3 (ここでちょっと停止) 4 5 |
今回は文字列を出力するだけですが、PrintActorを入れ替えて外部APIにアクセスすればOKという感じですね。
3行ずつ出力されることが分かるかと思います。
続いて、今回使用しているTimerBasedThrottlerのテストを書いてみましょう。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import scala.concurrent.duration._ import akka.actor.{ActorSystem, Props} import akka.contrib.throttle.Throttler.{RateInt, SetTarget} import org.scalatest.{Matchers, WordSpecLike} import akka.contrib.throttle.TimerBasedThrottler import akka.testkit.{ImplicitSender, TestKit} import akka.testkit._ class ThrottleSpec extends TestKit(ActorSystem("ThrottleSpec")) with ImplicitSender with WordSpecLike with Matchers { "qps (3 msg/s)" in { val echo = system.actorOf(TestActors.echoActorProps) val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3 msgsPer (1.second.dilated))) throttler ! SetTarget(Some(echo)) 1 to 7 foreach { throttler ! _ } within(1 second) { 1 to 3 foreach { expectMsg(_) } expectNoMsg() } within(1 seconds) { 4 to 6 foreach { expectMsg(_) } } within(1 seconds) { expectMsg(7) } } } |
実行するとテストが通ることが分かると思います。
TimerBasedThrottlerは、引数で受け取ったrateとmessage queueを内部的に保持しており、
rateの期間制限いっぱいまでqueに溜まった内容を処理し、queに処理が残れば次の期間に回すというシンプルな実装になっています。
図にすると以下のような感じです。
TimerBasedThrottlerはシンプルであるが故に以下の注意点があります。
① 開始時間のみを考慮するため、APIを受けるサーバの応答が遅い場合はサーバ側のQPS計測と乖離する可能性がある
② TimerBasedThrottler独自の間隔を使用するので、間隔の取り方によっては制限を超える可能性がある
図にすると以下のようになります。
①
②
タイミングがシビアなケースではTimerBasedThrottlerを拡張するなどの対応が必要かもしれませんね。
と、これだけでは寂しいので、少しだけ発展として以下のケースを考えてみます。
・APIで取得した値を使用して、さらにリクエストを送りたい
・ただし、QPSの合計が3QPSに収まっている必要がある
やりようは色々とあると思いますが、単純にThrottlerを使い回すやり方を紹介します。
code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
import scala.concurrent.duration._ import akka.actor.{Actor, ActorSystem, Props} import akka.contrib.throttle.Throttler.{RateInt, SetTarget} import akka.contrib.throttle.TimerBasedThrottler object ThrottleTest2 extends App { val system = ActorSystem("ThrottleSystem") val caller = system.actorOf(Props[ApiCaller], name = "ApiCaller") val result = caller ! "Parent" } class ApiCaller extends Actor { val caller = context.actorOf(Props[ThrottlerCaller], name = "ThrottlerCaller") var duringInquiry: scala.collection.mutable.Set[String] = scala.collection.mutable.Set.empty[String] def receive: Receive = init def init: Receive = { case message: String => { duringInquiry += message caller ! message context.become(running) } } def running: Receive = { case res: (String, List[String]) => { res._2.foreach{ message => duringInquiry += message caller ! message } } } } class ThrottlerCaller() extends Actor { val httpReq = context.actorOf(Props[RequestActor], name="HttpReq") val throttler = context.actorOf(Props(classOf[TimerBasedThrottler], 3 msgsPer 1.second), name="Throttler") throttler ! SetTarget(Some(httpReq)) def receive: Receive = { case message: String => throttler.forward(message) } } class RequestActor() extends Actor { def receive = { case message @ "Parent" => { println(message) sender() ! (message, List("Child1", "Child2", "Child3")) } case message: String if message.startsWith("Child") => { println(message) val grandChildList = 1 to 3 map ( num => "Grand" + message + "-" + num) sender() ! (message, grandChildList.toList) } case message => { println(message) sender() ! (message, List.empty[String]) } } } |
実行してみると、以下のような出力になると思います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
Parent Child1 Child2 (ここでちょっと停止) Child3 GrandChild1-1 GrandChild1-2 (ここでちょっと停止) GrandChild1-3 GrandChild2-1 GrandChild2-2 (ここでちょっと停止) GrandChild2-3 GrandChild3-1 GrandChild3-2 (ここでちょっと停止) GrandChild3-3 |
というわけで、今回はThrottlerのチュートリアルということで解説と簡単な実装を紹介しました。
あまり日本語の情報がないみたいなので参考にしていただけると幸いです。
Author