Blog
【Akka】Akka Streamsがめっちゃ便利すぎて脳汁が出た話し
こんにちは!Smalgoの來田です。
注意:タイトルは過激ですが内容至って普通なチュートリアル記事です。
仕事でWorkerを作った時に使ってみてめっちゃ便利だと思ったのでAkka Streamsについて書きたいと思います!
まだまだ中の実装の深いところまで追えてるわけじゃないので間違っていたら教えてください。
Akka Streamsとは
Reactive Streams(ノンブロッキングでback pressureな非同期ストリーム処理の標準仕様)のAkka実装
Back Pressureとは
非同期なストリーム処理の場合下記の問題が起きる
- Publisher側の処理が早い場合Subscriber側のバッファーが溢れてしまう
- Subscriberに遠慮してPublisher側の処理を抑えた場合は無駄が多くなってしまう
それをSubscriberが自分が処理できる量をPublisherにリクエストを送ることで無駄なくSubscriberが処理できる量を処理する仕組みがback pressure(背圧制御)
Akka Streamsの使い方
今回使用したversion
1 |
libraryDependencies += "com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0-M3" |
まず一番簡単な例
1 2 3 4 5 6 7 8 9 10 |
implicit val system = ActorSystem() // Actorを使うのでActorSystemが必要 implicit val mat = ActorFlowMaterializer() // 各設定やロジックなど(実装すればApacheSparkのような事も可能) // 上記は今後省略 val source = Source[Int](1 to 5) // Publisherのこと val sink = Sink.foreach[Int](println) // Subscriberのこと source .map(_ * 2) // Stageのこと(このstep一つ一つでActorが立ち上がる) .runWith(sink) // Sinkで結果を受け取る |
結果
1 2 3 4 5 |
2 4 6 8 10 |
この例でやってることはソースを見てもらえればほぼわかると思いますが
1..5までのInt型数値をPublisher(Actor)が供給し、受け取った数値に2を掛けるStage(Actor)に渡され受け取った数値をprintするSubscriber (Actor)が最終的に受け取り結果を返す
もちろんこの時source側がフックでパッシングされるわけではなくsink側のdemand requestによって1, 2, 3, 4, 5の順に送られるので、もしsink側でものすごい時間がかかる処理があった場合でも問題なく動作が行われます(back pressure)
また、基本的にSourceから下流に流した値の処理の順序は保証される(Futureを使ったものに関しては保証させる、させないの2つ方法があります)
次はちょっとこった例
1 2 3 4 5 6 7 8 9 10 |
val source = Source[Int](1 to 5) // Publisherのこと val sink = Sink.foreach[Int](println) // Subscriberのこと val step1 = Flow[Int].map(_ * 2) // Stageのことで処理の流れの1つ val g = FlowGraph { implicit builder => import akka.stream.scaladsl.FlowGraphImplicits._ source ~> step1 ~> sink } g.runWith(sink) |
結果
1 2 3 4 5 |
2 4 6 8 10 |
一個前の例と同じ動作をします。
処理の流れが直感的に見えるので個人的にはこちらの書き方の方が好きなのと、後述するBroadcastなどを使うならこちらの書き方が非常に使いやすいと思います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
val source = Source[Int](1 to 5) // Publisherのこと val sink = Sink.foreach(println) // Subscriberのこと val step1 = Flow[Int].map(_ * 2) // Stageのことで処理の流れの1つ val step2 = Flow[Int].map(_ + 100) // Stageのことで処理の流れの1つ val bcast = Broadcast[Int] val merge = Merge[Int] val g = FlowGraph { implicit builder => import akka.stream.scaladsl.FlowGraphImplicits._ source ~> bcast ~> step1 ~> merge ~> sink bcast ~> step2 ~> merge } g.runWith(sink) |
結果
1 2 3 4 5 6 7 8 9 10 |
101 2 102 4 103 6 104 8 105 10 |
このように簡単に処理をブロードキャストしマージするようなことも書けます。
また、この結果でもわかるように処理の順序は保証されています。
今回は触れませんがこの他にも型に合わせて順序を変えたり、値でfilterをかけるなどもデフォで用意されているので簡単に書けますし、ビジネスロジックなどをいれてオレオレ実装をすることも可能です。
Akka Streamsの実用について
これまで簡単な例で説明してきましたが実際に使うときにはどのように使えばよいのかを、Workerを作るときにどのような構成で行ったかを踏まえて簡単に説明したいと思います。
今回実装したWorkerの処理を超簡単に分類すると
- Queueからデータを取得する
- 取得したデータを使ってIDを生成
- 生成したIDをRedisにストアする
- 結果をロギングする
まず、Sourceに何を置くかですがこの内容だとQueueからデータを取得する部分をSourceに置けると思います。
1 2 3 4 5 6 7 8 9 10 11 |
case class Data(json: String) class DequeuePublisher extends ActorPublisher[Data] { def receive: Receive = { case Request(_) => if (totalDemand > 0) { onNext(Data(dequeue())) } case Cancel => // Flow内で何かしらのExceptionを吐いた場合もしくはどこかでcancelを行った場合 context.stop(self) } } |
これがその時の簡単なPublisherの実装になります。ちゃんとやるならハンドリング処理とか色々追加しないといけない部分はありますが今回は割愛しています。
この時新しくでてきたのはtotalDemandとonNextですね。
まずtotalDemandは現在Subscriberが欲しがっているリクエストの数です(後述の設定によって代わります)、この数値以上のリクエストを送ろうとするとExceptionを吐き送ることはできませんのでチェックが必要です。
onNextですがこれは言葉の通りで次の処理へ移行するという意味ですので次の処理へDataを渡したということです。
次にSubscriberですが今回は結果をロギングするを置きましたがロギング処理自体もFlowに入れてしまっても良いと思います。
1 2 3 4 5 6 7 8 9 10 |
class UserScoreSubscriber extends ActorSubscriber { override val requestStrategy = WatermarkRequestStrategy(4, 2) def receive = { case OnNext(id: Id) => log.info(id) case OnError(t: Throwable) => // Flow内で何かしらExceptionを吐いた場合 log.error(t) context.stop(self) } } |
簡単に実装するとこのように書くことができます。
requestStrategyというのはどのタイミングでSubscriberにリクエストを送るかの設定で、自ら実装することもできますし用意されているWatermarkRequestStrategyなどを使っても問題ありません。ちなみにこのWatermarkRequestStrategyは設定したhighWatermarkの数までlowWatermarkの数を切った時にリクエストを送るというもので、今回の場合4, 2で設定しているのでtotalDemandが1になった時に4までリクエストを送るという意味になります。
PublisherとSubscriberが決まったところで間の処理になりますが残った
- 取得したデータを使ってIDを生成
- 生成したIDをRedisにストアする
の2つを1つのStageにしてしまうよりIDを生成するStage, IDをRedisにストアするStageにわけたほうがテストもしやすくモジューラビリティもあがるので良いと思います。
またStageに関しては普通の関数で実装することもできますし、Stageをextendsしてクラスにすることもできますが今回使ってみて、どうしようもないとき(ある特定条件では下流に流さないなど)以外は基本的に関数で行ったほうがテストもしやすくFutureを使った場合も安全に書くことができるので良いのではないかなと思っています。
1 2 3 4 5 6 |
type Id = Long class IdGenenerateStage { def run(data: Data): Future[Id] = { idGenerate(data) // dataを受け取ってFuture[Id]を返す処理 } } |
1 2 3 4 5 |
class IdStoreStage { def run(id: Id): Future[Id] = { idStore(id).map(_ => id) // idを受け取ってRedisにストアするFuture } } |
Stageに関してはこのようにPOJOで書くことができます。
では最後にこの1つ1つの処理を1つのFlowにして実行したいと思います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
implicit val sys = ActorSystem() implicit val mat = ActorFlowMaterializer( // input bufferの数でActorの立ち上がる数が決まるのでパフォーマンスに影響がでる ActorFlowMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4) ) val in = Source[Data](Props[DequeuePublisher]) val out = Sink[Id](Props[LoggingSink]) val idGen = Flow[Data].mapAsyncUnordered(new IdGenerateStage.run) val idStore = Flow[Id].mapAsyncUnordered(new IdStoreStage.run) val g = FlowGraph { implicit builder => import akka.stream.scaladsl.FlowGraphImplicits._ in ~> idGen ~> idStore ~> out } g.run() |
このように簡単に1つのFlowを作成することができます。
Futureを返す関数の場合mapAsync or mapAsyncUnorderedを使う必要がありますが違いは関数名の通り順序を保証したいならmapAsyncを保証しなくていいならmapAsyncUnorderedの方を使います。mapAsyncの場合Futureの1つが遅かった場合すべての処理に影響してしまうので特に順序に意味がないような処理ならばmapAsyncUnorderedの方を使ったほうが良いと思います。
Akka StreamsのFault Tolerance
Akka Streams内で何かしらのExceptionを吐いた場合、PublisherのCancel, SubscriberのOnError(t: Throwable)にリクエストを送りその後エラーが起きたActorはStopされる。そのためその後処理を継続したとしてもStopしたActorへはDead Letterになってしまうので動作が継続できません。
また、Akka Streams内のActorを管理しているSupervisorに関しても現状口が開いておらずカズタマイズできない(※M4でflow全体のRestartやStopなどが追加されるみたいです)ので下記2つの方法のどちらかを取る必要があるかと思います。
- Flow内ではExceptionを絶対に起こさないようにハンドリングする(recoverするなど)
- Flow内でExceptionが起きたらすべてのActorを停止して、Graph毎実行し直す
このようにあまり良いFault Toleranceの方法はないのが現状です。
まとめ
今回のようなWorkerを作るときなどに背圧制御などが基本必要だと思いますがそういったことを意識しなくてもAkka Streams側がよしなにやってくれるのは非常に便利だと思いました。また、今回は使いませんでしたがBroadcastやMergeなどの便利機能、直感的に動作を理解できるFlowGraph、Akkaで実装されているのですべてをノンブロッキングで実装すればパフォーマンスに関しても非常に高速と個人的には至れり尽くせりな感じを受けました。
しかし、まだライブラリはexperimentalと実験的なものなので未知のバグなどもあると思いますので使用の際は自己責任かつ最後まで面倒見れる方のみ使用したほうが良いと思います。
最後になりますがReactive StreamsのJava9への一部導入の話しなどもあったり使ってみてすごく便利だと思ったので今後流行るのではないかなと個人的には思っていたりします。
以上ここまでみてくださってありがとうございました
Author