Blog
Akka Cluster Shardingで即時集計系をつくる
こんにちは、CA ProFit-X の松村です。
私たちのプロダクトでは広告配信のシステムにAkkaを使っていて、アプリケーションはAWS/Docker環境に配備して運用しています。今回はAkkaのCluster Shardingという拡張を使って、主にAWS環境を想定した簡単なリアルタイム集計システム(即時集計系)をつくってみたのでその内容をご紹介します。
下ではAkka Cluster Shardingについて簡単に説明した後、試作した即時集計系の実装・永続化と配備について説明します。その後、ワークロードとノード数を増減させた実験とその結果について説明します。
Akka Cluster Sharding について
Akka ClusterはGossip Protocolとφ Accrual Failure Detectorという手法に基づいた非中央集権的なノード群構築を行うAkka拡張です。Akka Cluster ShardingはこのAkka Clusterのノード群にアクターを分散させて動的に再配置し、物理的な位置を意識せずにそれらのアクター群とメッセージをやりとりするためのAkka拡張です。
Akka Cluster Shardingのアプリケーションの構成要素を右図に示しました。複数のアクターで構成されるシャード(Shard)が、1つのノード(ActorSystem)に複数割り当てられます。デフォルトのロジックでは、各ノードに割り当てられるシャード数がなるべく均等になるように割当てられます。ノードの追加・削除がAkka Clusterによって検出されると、指定された割当てロジックに従いAkka Cluster Shardingによってシャードの再配置(rebalance)が行われます。この仕組みによって、単一障害点や性能上のボトルネックが生じにくくなっています。
出現回数を数えるクラスタ
今回はこのAkka Cluster Shardingを使って、 右のようなノード構成の即時集計系をつくってみました。Akka Clusterのノードにはロールを割り当てることができ、frontend, counterはこのロールの名前です。
frontendロールのノードにいるFrontendアクター群でランダムなキーを生成し、counterロールのノードにいるCounterアクター群に送信します。キーの送信はAkka Cluster Shardingによってシャーディングされ、送信先の個々のアクターはキーに基づいて一意に決まります。
Counterアクターは受信したキーごとにその出現回数をカウントし、応答として過去1分間の移動平均を返します。またCounterアクターが保持する状態はこのアクターが決めたタイミングと直列化方式でDynamoDBに保存し、Counterアクターが属するシャードが別のノードに移動した際の復帰処理で使われます。
出現回数を数えるクラスタの実装
各ノードは別々のJVMで実行されるので、javaコマンドの -Dconfig.resource
で指定する設定ファイルの内容によって処理内容を切り替えるようにしました。ノードのロールには frontend, counter に加えて、seed (Akka Clusterにおいてノード群のライフサイクルを管理するノード) を準備しました。
プログラムのメインメソッド(githubからの抜粋)は下のようになります:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// Main.scala object Main { val roles = ConfigFactory.load.getConfig("akka.cluster").getStringList("roles").asScala val primaryRole: String = roles.headOption.getOrElse(sys.error("akka.cluster.roles is empty")) def main(args: Array[String]): Unit = { implicit val system = ActorSystem("cluster") primaryRole match { case "seed" => Counter.startSharding() case "counter" => Counter.startSharding(proxyOnlyMode = false) case "frontend" => Counter.startSharding() Frontend.run(system) case role => sys.error(s"Unexpected role $role") } } } |
設定ファイルの akka.cluster.roles
に指定したリストの最初の項目が seed, counter, frontend だった場合にそれぞれ、後述の Counter.startsharding() の呼び出しによってクラスタへの参加とシャーディングのセットアップを行います。また、frontendの場合はさらにその後 Frontend.run(..) によって定期的なメッセージの送信を開始します。
Counter.startSharding() 下のように実装しました。APIは akka-cluster-sharding 2.4-M1 のものを使っています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
// Counter.scala object Counter { case class Post(id: String) def startSharding(proxyOnlyMode: Boolean = true)(implicit system: ActorSystem): Unit = { ClusterSharding(system).start( shardResolver = shardResolver, idExtractor = idExtractor, typeName = Counter.shardingName, entryProps = if (proxyOnlyMode) None else Some(Props(classOf[Counter])), roleOverride = None, rememberEntries = true) } val shardResolver: ShardRegion.ShardResolver = { case msg: Counter.Post => shardKey(msg.id) } val idExtractor: ShardRegion.IdExtractor = { case msg: Counter.Post => (entryKey(msg.id), msg) } val shardingName = "Counter" val numOfShards = 60 val entriesPerShard = 4 def shardKey(id: String) = (id.hashCode % numOfShards).toString def entryKey(id: String) = (id.hashCode % (numOfShards * entriesPerShard)).toString } |
Frontendアクターからのメッセージ用にcase calss Postを定義し、ClusterSharding(..).start(..) によってクラスタへの参加とシャーディングのセットアップを行います。start(..)の引数にある shardResolver にはメッセージが属するシャードを決めるロジックを、 idExtractorにはメッセージが送られるアクター(entry)を決めるロジックを指定します。また、entryPropsの引数には、start(..) メソッドを呼んだノード上で実行できるアクターを指定できるようになっていました。今回はcounterロールのノード上だけでCounterアクターを実行したかったので、他のノード上でこのメソッドが呼ばれた場合(proxyOnlyModeがfalse)はここにNoneを指定するようにしました。Akka 2.4の最新版ではこのインタフェースは少し変更されているようです。
次に、Frontend.run() は下のように定期的にメッセージを送信アクターを起動する実装をしました:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
object Frontend { def run(system: ActorSystem): Unit = system.actorOf(Props(classOf[Frontend]), "frontend") ! Symbol("Run") } class Frontend extends Actor with ActorLogging { import Frontend._ implicit val ec = context.dispatcher implicit val timeout = Timeout(1.seconds) val counter = ClusterSharding(context.system).shardRegion(Counter.shardingName) val random = new Random() def receive = { case 'Run => context.system.scheduler.schedule(0.millis, 40.millis) { postMessage } } private def postMessage: Unit = { val key = List("key", random.nextInt(numOfKeys)).mkString counter.ask(Counter.Post(key)). map { case value: Double => log.info(s"Received: $value") } } } |
40ミリ秒毎に postMessage が呼ばれ、この中でcounterというActorRefに対してランダムに生成したキーのメッセージを送信します。このcounterは ClusterSharding(system).shardRegion(..)
の評価結果であり、このActorRefに送信されるメッセージは前述の startSharding(..) の指定に従い複数ノードにシャーディングされます。
Counterアクターの実装と状態の永続化
Akka Cluster Sharding ではシャード再配置の際にアクターの状態を復元するために Akke Persistence による永続化を想定しているようです。Akka Persistenceによる永続化にはジャーナリングとスナップショットの2種類あり、CassandraやKafka、Chronicleなど 様々なバックエンドのプラグイン があります。今回は集計の厳密性を求めない代わりに永続化バックエンドの負荷と運用コストを軽減できるような構成を試してみたかったので、これらの永続化プラグインの代わりに、一定の時間間隔でアクターの状態を複数パックしてDynamoDBのデータ項目として保存するような永続化にしてみました。
このような永続化を前提にCounterアクターは、別途ユーティリティとして準備した 1) 移動平均を計算するクラス SimpleMovingAverage 、2) DynamoDBに計算過程を複数パックして保存しロードするトレイト Buckets を利用して、下のように実装しました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
class Counter extends PersistentActor with Buckets[SimpleMovingAverage] { import Counter._ implicit val ec = context.dispatcher override val persistenceId: String = shardingName + "-" + self.path.name override val receiveCommand: Receive = { case Counter.Post(key) => val value: SimpleMovingAverage = getValue(key).getOrElse(defaultState) value.increment setValue(key, value) sender ! value.value } private def defaultState = SimpleMovingAverage(10, 6) // 10秒間隔で6回分遡った移動平均 override def preStart: Unit = loadBuckets.map { _ => context.system.scheduler.schedule(0.seconds, saveBucketsInterval) { saveBuckets } self ! Recover() } override def postStop: Unit = { saveBuckets super.postStop() } } |
PersistentActor は receive の代わりに receiveCommand にアプリケーションロジックのメッセージを書く仕様なので、このメソッド内で Post メッセージを受け付けて出現回数をインクリメントします。getValue, setValueは前述のBucketsトレイトのもので、それぞれ内部状態の取得と更新を行います。アクター起動前の処理 preStart はAkka Persistence標準のものを上書きし、BucketsトレイトのloadBucketsによってDynamoDBから自身の状態を復元してからAkka Persistenceの Recover メッセージを自身に送る実装にしています。
出現回数を数えるクラスタの設定ファイル
各ロールの設定ファイルは下のように、前述の akka.cluster.roles に名前を指定しました。
1 2 3 |
// seed0.conf include "application.conf" akka.cluster.roles = ["seed"] |
1 2 3 |
// counter.conf include "application.conf" akka.cluster.roles = ["counter"] |
1 2 3 |
// frontend.conf include "application.conf" akka.cluster.roles = ["frontend"] |
また、これらが共通に include している application.conf は下のようになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// application.conf akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { netty.tcp { hostname = "127.0.0.1" hostname = ${?AKKA_HOSTNAME} port = 0 port = ${?AKKA_PORT} } } cluster { seed-nodes = [ "akka.tcp://cluster@127.0.0.1:2551" ] sharding { rebalance-threshold = 3 } } } |
akka.actor.provider には Akka Cluster で決まっているものを指定し、 akka.remote.netty.tcp には他のノードからどのホスト名/ポートで参照してもらうか (hostname / port) を指定します。akka.cluster.seed-nodes にはシードノード群の場所を指定します。このパラーメータはjavaコマンドの引数に -Dakka.cluster.seed-nodes.0=$SEED0
のように指定することもできます。rebalance-threasholdはデフォルトのシャード再配置ロジックでのパラメータで、シャード数の差がどれくらい開いていたら再配置を実行するかのを指定します。
負荷とノード数を変化させる実験
上で作ったアプリケーションの振舞いが、負荷(メッセージ数)とノード数の変化に対してどう変わるかを簡単に実験しました。実験にはまず、実装したコードと設定をDockerでコンテナ化しAWSの Elastic Beanstalkの1つのアプリケーションとしてセットアップしました。さらにその中でシード用、フロントエンド用、カウンタ用の3つの環境を立ち上げ、フロントエンド・カウンタの環境ではオートスケーリングを有効にしました。EC2のタイプはt2.microを使い、メトリクスの埋込と取得にはそれぞれ Kamon と Datadog を利用しました。シナリオは、10分おきにフロントエンドのノード数を 1→2→3→4→3→2→1 と増減させ、5分遅れでカウンタノードも同じように増減させました。
下は、カウンタアクターで受信した秒間メッセージ数のホスト毎の合計の推移です。
グラフ中の13:50にかけて秒間メッセージ数が増えて行く一方で、少し遅れてスケールアウトしたカウンタノード群に徐々にシャードが再配置されてメッセージ受信先が分散している様子が分かります。その後は徐々に秒間メッセージ数も減少しますがカウンタノードもスケールインし、ノード減少のタイミングでシャードが再配置されて残りのノードにメッセージが集約されている様子が分かります。
下は、フロントエンドアクターから送信したメッセージ数(sent)、受け取った応答数(recv)、応答エラー(1秒以内に応答がなかったもの, error)数の秒間レートの推移です。
ノード数が増えているもしくは変化のない間は安定しています。一方カウンタがスケールインするタイミングで、ある程度の割合でエラーが発生しています。これはノードダウンの検出とシャードの再配置が完了するまでのタイムラグが影響しているものと思います。下は、フロントエンドでメッセージを送信してから応答を受け取るまでの時間の平均 (avg)と95パーセンタイル値(95percentile) [ミリ秒] のホスト間の平均の推移です。
カウンタのスケールインの際にのみ応答時間が悪化していて、上のエラー(タイムアウト)の原因がシャード再配置によるタイムラグであることが見てとれます。
このようにスケールイン(またはノード障害)時にシャード再配置のタイムラグの影響があるので、要件によっては再配置の最大同時シャード数や再配置ロジック自体のチューニング、スケールイン前の再配置トリガなどの対応が要りそうです。スケールイン前の再配置トリガについては今回Elastic Beanstalkのオートスケーリングを利用したため難しそうですが、例えばAmazon ECSに配備してオートスケーリングを実装することでスケールインとシャード再配置を密に連携させることができるかもしれません。
まとめ
今回はAkkaのCluster Shardingという拡張を使って、主にAWS環境を想定した簡単なリアルタイム集計システムをつくってみました。DockerコンテナをElastic Beanstalkに配備してノード数を変化させた実験を行い、ノード間でシャードが再配置される様子を確認することができました。今回実装したコードは GitHub上 にありますので、詳細はそちらをご覧ください。
他に確認しておきたい点としては、アプリケーション更新時の配備フロー、メッセージのシャードキー変更時の移行フロー、厳密さと永続化コストとの間の柔軟性、大規模・高スループット時の振舞い、Apache Sparkのエコシステムとの連携などがあります。
このような考慮点はありますが、Akkaのインタフェースで単一障害点や性能上のボトルネックが生じにくい環境を構築できるというのは Akka Cluster Sharding の魅力ではないかと思います。
最後まで読んで頂きありがとうございました。
Copyright about logo
© 2011-2015 Typesafe Inc.
Author