Blog
【Akka入門の入門】Part.1 メッセージを送る
初めまして、新卒の増田と申します(^o^)
入社後はJavaを使っていて、チームを異動して初めてScalaを触り始めた・・・
という頃に、Akkaというフレームワークを使うので勉強するように言われました(^o^;)
まず入門書を買おうとしたのですが、日本語の本がない・・・(T_T)
ドキュメントも英語だし何書いてあるのかさっぱりわからない・・・(T_T)
Scalaも始めたばっかでよくわからない・・・(T_T)
という状況だったので、英語でもドキュメントより入門書の方がまだ理解できるかも・・・と思い『Developing an Akka Edge』という本を購入しました。
その本を読みながら、ドキュメントを読みながら、先輩に聞いたり調べながらAkkaについて勉強したことをスライドにまとめました!
ただとても長いので、このブログでは簡潔なまとめ+説明のための簡単なコードを書いていきます(^o^)
(↓でもスライドも見ていただけると嬉しいです(^o^)↓)
[slideshare id=43156127&doc=devakka1-3-150102142713-conversion-gate01]
Part.1 は入門の入門で、とりあえずアクターを動かしてみます。
(Developing an Akka Edgeの章とこの記事のPartは無関係です。)
Akkaでは、アクターを使った並列処理ができます。
アクターはメッセージを受信すると、メッセージに応じて定義されたふるまいをします。
非同期で動いているアクター同士がやりとりするという仕組みです。
(並列処理なのですが、スレッドの話は後の回のどこかででてくる予定なので今は気にしないでください・・・)
スレッド間で資源を共有して、「誰かが使っている間はロックする」という並列処理だと
ロックされている間待ち時間が発生したり、デッドロックが発生したり…
ロックを使ったアプローチがなぜ良くないのかという話が延々と本に書いてありました。
Akkaの良いところは
並列で動かすスレッド数を増やしても処理速度が落ちにくい
耐障害性を上げるためのいろんな仕組みがある
どのマシン上で動いているか意識しなくてよい(location transparency)
などいろいろですが
使いこなせれば便利そうな機能がいっぱいありすぎてよくわからないので、次回以降少しずつ紹介できたらなと思います。
アクターを使う手順としてはこんな感じです。
①アクターに、どのメッセージを受け取ったら何をするか定義する
②ActorSystemからアクターを生成する
③作ったアクターにメッセージを送る
sbtでAkkaを使えるようにします。
build.sbt
1 |
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.9" |
①アクターに、どのメッセージを受け取ったら何をするか定義する
Actorを継承したクラスを定義して、その中にreceiveメソッドを定義します。
パターンマッチで、どのメッセージならどの処理かを書きます。
1 2 3 4 5 6 7 |
import akka.actor.Actor class HelloActor extends Actor{ def receive = { case "Hello" => println("World") } } |
②ActorSystemからアクターを生成する
ActorSystemを作ります。
ActorSystemのactorOfメソッドとPropsを使ってさっきのアクターを生成します。
Propsはアクターを作るのに必要で、作ったアクターをsystemに登録するようなイメージです。
1 2 3 4 |
import akka.actor.{ActorSystem,Props} val system = ActorSystem("system") val actor = system.actorOf(Props[HelloActor]) |
③作ったアクターにメッセージを送る
さっきの変数と
! メソッドを使ってメッセージを送ります。
1 |
actor ! "Hello" |
実行すると・・・
1 2 |
$sbt run World |
と表示されるはずです
これでアクターにメッセージを送る事ができたのですが
送りっぱなしではなくアクターが処理した結果をこの先の処理に使いたい場合があると思います。
(アクターに何か計算させたりDBにアクセスさせたりする時など)
アクターから返事をもらう時は、Futureという型を使います。
Future型の変数の中には、処理済み・処理中・まだ処理されていない(永遠にされないかもしれない)結果が入ります。
例を見ないとイメージできないと思うので、さきほどの手順でアクターを作ってメッセージを送ってみましょう
①アクターに、どのメッセージを受け取ったら何をするか定義する
さきほどのHelloActorに、メッセージを受け取ったら返事を返す処理を追記します。
1 2 3 4 5 6 |
class HelloActor extends Actor{ def receive = { case "Hello" => println("World") case "How are you?" => sender ! "I'm fine thank you!" } } |
sender はメッセージの送り主のアクターを参照するものです
ちなみに
selfで自分を参照できます
"How are you?" というメッセージが来たら、送り主に "I'm fine thank you!" と送るようにします。
②ActorSystemからアクターを生成する
はさきほどと同じです。
③作ったアクターにメッセージを送る
アクターからの返事を格納するFutureを使う時に、いくつか必要なことがあります。
timeoutとexecution contextの設定です。
timeoutはアクターからFutureが返ってくるのを待つ時間です。
1 |
implicit val timeout = Timeout(5 seconds) |
execution contextについてはスレッドの話のところで説明します。
1 |
import scala.concurrent.ExecutionContext.Implicits.global |
そして、アクターにメッセージを送るのですが
さっき使った
! メソッドではなく
? メソッドを使います。
戻り値がFutureなので、変数に格納します
1 |
val reply = actor ? "How are you?" |
Futureの中身を取り出す処理が必要です。
1 2 3 |
reply.onSuccess{ case msg:String => println("reply from actor: " + msg) } |
こうするとアクターが送ってきた文字列を処理の中で使えます。
さっきは
onSuccess メソッドを使いましたが
アクターの処理が遅くてタイムアウトになったり、エラーなどで返事が返ってこない場合の処理を書きたい時は
1 2 3 4 |
reply.onComplete{ case Success(msg: String) => println("reply from actor: " + msg) case Failure(_) => println("Message Failure") } |
onComplete メソッドを使います。
手順②と③をまとめたものがこちらです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import akka.actor.{ActorSystem,Props} import akka.pattern.ask import akka.util.Timeout import scala.concurrent.duration._ import scala.util.{Success,Failure} import scala.concurrent.ExecutionContext.Implicits.global object Boot extends App{ val system = ActorSystem("system") val actor = system.actorOf(Props[HelloActor]) implicit val timeout = Timeout(5 seconds) val reply = actor ? "How are you?" reply.onSuccess{ case msg:String => println("reply from actor: " + msg) } } |
送る側と受け取る側でメッセージの型が分からなくて型安全じゃなくなってしまいそうなのですが
メッセージのプロトコルとして定義したcase classを使ったり、mapToメソッドを使ってFutureの型を指定したり…という工夫をします。
今回本当に入門の入門で1つのアクターにメッセージを送っただけになっちゃいましたが、次回以降は複数のアクターを使っていろいろお見せできたらいいなと思います。
Author