Blog
[Strata Data Conference 2017 in New York] – Tutorials
アドテク本部の水野です。
9/25 – 9/29にニューヨークで開催されたStrata Data Conference 2017に参加してきました.
Strata Data Conferenceは、機械学習、データ管理、ストリーム処理、分散処理といった、大規模データに関するトピックを幅広く扱うカンファレンスの1つです.この記事では本会議前日に受講させていただいたチュートリアルの内容を簡単に紹介したいと思います.
今回私が受講したのは下記のチュートリアルです.
Unraveling data with Spark using deep learning and other algorithms from machine learning, Vartika Singh (Cloudera), Jeffrey Shmain (Cloudera)
一言で言うと,SparkMLで特徴量の抽出のための前処理、モデルパラメータの学習、モデルのバリデーションといった機械学習でよく用いられる操作が、大規模データにも適用可能な分散環境でも簡単に実現できますよという内容のチュートリアルでした.
以前からSparkにはMLlibという機械学習ライブラリが付属していたのですが、そのうちAPIがDataFrameベースのものをSparkMLと呼んでいるようです.ちなみにDataFrameは、Spark固有の分散処理向けのデータ構造であるRDDにスキーマの概念が追加されたもので、RDBのようにカラム名や型が付いています.
1 2 3 4 |
val session = SparkSession.builder().master("local[2]").getOrCreate() // SparkSessionの作成 (masterはここでは2スレッドのstand-aloneモードで起動していますが、ここは適宜) val df = session.read.json("gs://spark-ml-test/data.json") // DataFrameにJSONファイルをロード df.printSchema() df.show() |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
root |-- name: string (nullable = false) |-- age: integer (nullable = false) |-- occupation: string (nullable = false) |-- label: integer (nullable = false) +-----+---+----------+-----+ | name|age|occupation|label| +-----+---+----------+-----+ |Alice| 25| engineer | 1 | | Bob | 22| student | 0 | |Cathy| 25| banker | 1 | +-----+---+----------+------ |
SparkMLでは各種TransformerやEstimatorを組み合わせることによって,DataFrameに対する処理として前処理やモデルパラメータの学習を実装することができます.機械学習で一般的によく使われるような処理は組み込みのコンポーネントとして実装されており,簡単なものならそれらを組み合わせるだけでも作ることができます.
例えば,カテゴリカル変数に対する前処理としてしばしば用いられるインデクシング処理は次のようにStringIndexerを使うだけで簡単に実現することができます.
1 2 3 4 5 6 |
import org.apache.spark.ml.feature.StringIndexer val indexers = Seq("name", "age", "occupation").map { name => new StringIndexer().setInputCol(name).setOutputCol(s"${name}_index") } val indexedDF = new Pipeline().setStages(indexers.toArray).fit(df).transform(df) indexedDF.show() |
1 2 3 4 5 6 7 |
+-----+---+----------+-----+----------+---------+----------------+ | name|age|occupation|label|name_index|age_index|occupation_index| +-----+---+----------+-----+----------+---------+----------------+ |Alice| 25| engineer | 1 | 0.0 | 0.0 | 2.0 | | Bob | 22| student | 0 | 2.0 | 1.0 | 1.0 | |Cathy| 25| banker | 1 | 1.0 | 0.0 | 0.0 | +-----+---+----------+-----+----------+---------+----------------+ |
このように,StringIndexer#setInputColに渡したカラムの内容を値ごとにindex値に変換したものが,StringIndexer#setOutputColで設定したカラムに格納されます.
ちなみに,StringIndexerなどのEstimatorはPipelineStageというtraitのサブクラスになっているので,上記の例のようにパイプラインとして一度に適用することもできるようになっています.
次に,後続の学習に使用する特徴量を選択し,一つのカラムにまとめる必要があります.
これもVectorAssemblerを使えば次のように簡単に実現できます.
1 2 3 4 5 6 7 |
import org.apache.spark.ml.feature.VectorAssembler val assembler = new VectorAssembler() .setInputCols(Array("name_index", "age_index", "occupation_index")) .setOutputCol("features") val assembledDF = assembler.transform(indexedDF) assembledDF.show() |
1 2 3 4 5 6 7 |
+-----+---+----------+-----+----------+---------+----------------+-------------+ | name|age|occupation|label|name_index|age_index|occupation_index| features | +-----+---+----------+-----+----------+---------+----------------+-------------+ |Alice| 25| engineer | 1 | 0.0 | 0.0 | 2.0 |[0.0,0.0,2.0]| | Bob | 22| student | 0 | 2.0 | 1.0 | 1.0 |[2.0,1.0,1.0]| |Cathy| 25| banker | 1 | 1.0 | 0.0 | 0.0 |[1.0,0.0,0.0]| +-----+---+----------+-----+----------+---------+----------------+-------------+ |
この処理により, VectorAssembler#setInputColsで渡したカラム集合が,VectorAssembler#setOutputColで指定したカラム(ここではfeatures)にVectorとしてまとめられます.
ここまでで変換したDataFrameをモデルに入力として渡すことによって,学習を行うことができます.
モデルに関しても簡単なモデルなら組み込みのモデルが用意されているので,ハイパーパラメータを指定するだけで簡単に構築できます.
例えばロジスティック回帰だと次のようになります.
1 2 3 4 5 6 7 |
import org.apache.spark.ml.classification.LogisticRegression val model = new LogisticRegression() .setMaxIter(50) .setFeaturesCol("features") .setLabelCol("label") .fit(assembledDF) |
あとは学習したモデルを予測したいデータ集合に対して適用するだけです.
ただし,入力するデータ集合には当然ですが,学習時と同じ前処理を施す必要があります.
ここまでは個々にに処理を適用してきましたが,実用では一連の前処理を一つのPipelineとしてまとめておいた方が良いと思います.
今回だと,カテゴリカル変数のインデクシング,特徴量のベクトル化の二つを前処理として行ったので,次のようにこれを一つのパイプラインにまとめます.
1 |
val pipeline = new Pipeline().setStages(features.toArray :+ assembler) |
これを次のテストデータに適用し,予測値を計算します.
1 2 3 4 5 6 7 |
+------+---+----------+ | name |age|occupation| +------+---+----------+ |Edward| 27| student | | Fox | 25| engineer | | Gille| 25| engineer | +------+---+----------+ |
1 2 3 |
val predictionInputDF = pipeline.fit(testSet).transform(testSet) val resultDF = model.transform(predictionInputDF) resultDF.show() |
1 2 3 4 5 6 7 |
+------+---+----------+----------+---------+----------------+-------------+--------------------+--------------------+----------+ | name |age|occupation|name_index|age_index|occupation_index| features | rawPrediction | probability |prediction| +------+---+----------+----------+---------+----------------+-------------+--------------------+--------------------+----------+ |Edward| 27| student | 1.0 | 1.0 | 1.0 |[1.0,1.0,1.0]|[17.8648754105089...|[0.99999998256655...| 0.0 | | Fox | 25| engineer | 0.0 | 0.0 | 0.0 | (3,[],[]) |[-18.311224465270...|[1.11567127880168...| 1.0 | | Gille| 25| engineer | 2.0 | 0.0 | 0.0 |[2.0,0.0,0.0]|[-17.268510850550...|[3.16505319192869...| 1.0 | +------+---+----------+----------+---------+----------------+-------------+--------------------+--------------------+----------+ |
ここまでで一通り学習,予測まで動作するところまではできていますが,実運用では,モデルを一度作ったらハイパーパラメータをチューニングを行うと思います.SparkMLではこれを楽に行えるようにグリッドサーチを行うAPIも提供されています.
LogisticRegressionモデルだと,パラメータ更新時のイテレーション回数,目的関数の正規化項がグリッドサーチで調整できます.
1 2 3 4 5 6 7 8 9 10 11 |
val bestModel = new CrossValidator() .setEstimator(new LogisticRegression()) .setEvaluator(new BinaryClassificationEvaluator()) .setEstimatorParamMaps( new ParamGridBuilder() .addGrid(model.regParam, Seq(1.0e-4, 1.0e-3, 1.0e-2, 1.0e-1)) // 正規化項のハイパーパラメータ .addGrid(model.maxIter, Seq(50, 100)) // イテレーション回数 .build() ).setNumFolds(3) .fit(df) .bestModel // 最良のモデルを選択 |
これにより指定したハイパーパラメータの組を網羅的に評価した上で最適なモデルを選択することができます.
選択したモデルを予測に利用する手順は前述のものと同様です.
作ったモデルは次のようにファイルに保存することができます.
1 |
bestModel.save("gs://samplebucket/sample") |
この処理を行うとSparkらしくparquetで出力してくれます.ファイルの内容は各モデルクラスに実装されているので,モデルによって異なりますが,Logistic Regressionだと次のような内容でした.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
message spark_schema { required int32 numClasses; required int32 numFeatures; optional group interceptVector { required int32 type (INT_8); optional int32 size; optional group indices (LIST) { repeated group list { required int32 element; } } optional group values (LIST) { repeated group list { required double element; } } } optional group coefficientMatrix { required int32 type (INT_8); required int32 numRows; required int32 numCols; optional group colPtrs (LIST) { repeated group list { required int32 element; } } optional group rowIndices (LIST) { repeated group list { required int32 element; } } optional group values (LIST) { repeated group list { required double element; } } required boolean isTransposed; } required boolean isMultinomial; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
numClasses = 2 numFeatures = 3 interceptVector: .type = 1 .values: ..list: ...element = 18.311224465270886 coefficientMatrix: .type = 1 .numRows = 1 .numCols = 3 .values: ..list: ...element = -0.5213568073602765 ..list: ...element = -35.96680562628096 ..list: ...element = 0.3120625578614303 .isTransposed = true isMultinomial = false |
上述の通り,SparkMLは予測時にも入力としてDataFrameを取るので,複数の入力に対して一括で予測値を計算するというような用途には向いていると思いますが,Bidderのようにリクエストが来るたびに予測値を出さないといけないような状況でそのまま使うのは難しいと感じました.parquetのreaderは様々な言語でライブラリが提供されているので,それを使って自前でモデルの予測計算を実装する必要が出て来そうです.その際,予測側も学習時と同じ前処理を実装しないといけないので,注意しましょう.(この辺りはあまり詳しくないので,良い方法があれば教えてください)
これらの具体的なSparkMLの使い方の話の他にも,モデルの並列学習の方式(Data Parallel, Model Parallel)が,それぞれどういう基盤に向いているのかといった話や,最近の分散機械学習周りで著名なライブラリについても軽く触れていたりと,俯瞰するには良いチュートリアルだったと思います.興味のある方は,ぜひ一度資料の方にも目を通してみてください.また,SparkML周りのサンプルコードもこちらで提供されているので,ぜひご覧ください.
BigDLはカンファレンス全体としてもやや推されているという印象を受けたので,こちらも別の機会に調査してみたいと思いました.
Author