AMP Campをひと通りさらってみる:第6回 RDD
- 第1回 IntroductionとGetting Started
- 第2回 Data Exploration Using Spark
- 第3回 Explore In-Memory Data Store Tachyon
- 第4回 Movie Recommendation with MLlib
- 第5回 Data Exploration Using SparkSQL
Exerciseにはないですが、Spark Programming GuideからRDDを中心に。
RDDのメモ
Resilient Distributed Datasets(RDDs)
Parallerized Collection
- SparkContextのparallelizeメソッドをコレクションに対して呼び出す
- コレクションの要素は並列処理可能な分散データへと形を替えるためにコピーされる
- 重要なパラメータはデータを分けるpartitionの数である
External Dataset
- ストレージにあるデータソースから分散データセットを作ることもできる
- ストレージとしては幅広く対応している
- ファイルフォーマットとしてはHadoopのInputFormat
- 例えば、テキストファイル読み込みはSparkContextのtextfileメソッドでデータソースのURIを記述する
- ファイルを読む場合の注意点
- ローカルのファイルパスを読む場合、全てのworkerノードが同一のパスで参照できなければならない。コピーするか共有ファイルシステムを利用する
- ディレクトリや圧縮ファイル等にも対応
RDD Operations
- RDDは2つのオペレーションをサポートしている
- transformは遅延評価される
- ベースとなるデータセット(例えばfile)を記憶している
- actionが結果を要求するとき(実行されるとき)transformは実行される
- デフォルトではactionが実行される時、毎回transformが実行される
val lines = sc.textFile("data.txt") // 1 val lineLengths = lines.map(s => s.length) // 2 val totalLength = lineLengths.reduce((a, b) => a + b) // 3 lineLengths.persist() // 4
- 1は外部ファイルからRDDを定義している
- 実際の読み込みが行われるわけではない
- linesは単なるファイルへのポインタ
- 2がtransform。各行の長さを求めている
- 遅延評価なのでこのタイミングでは実行されない
- 3がaction。各行の長さを足しあわせている。
- このタイミングでSparkは処理をタスクに分割している
- 分割された処理ごとに各マシンでmapとlocalでのreduce処理を行う
- 4がpersist。
- lineLengthの再計算をしたくない場合はこうすることで保存される
Passing Function to Spark
- SparkのAPIはクラスタ上で動作せるためのdriver program内の関数を渡すことを強く信頼していて(必要としてる?)、推奨される方法は2つ
- 無名関数を渡す
- グローバルなシングルトンオブジェクトの静的メソッドとして渡す
- クラスインスタンスのメソッドへの参照も渡すことができるが、メソッドを含んだクラスオブジェクト全体を渡す必要がある
Working With Key-Value pair
- key-valueのペアになっているRDDだけ使える特別な処理がある
- reduceByKeyとか
- key毎に何か処理をするパターン
RDD Persistent
- Sparkの最も重要な特徴の一つ
- データセットをメモリに保存して各処理で共通して利用できる
- persist()かcache()メソッドを使う
- persistには異なるstrage levelを選ぶことができる。
- persist()を呼ばなくても、reduceByKeyなどのshuffleが発生するオペレーションでは中間データを自動的に保存する
Which Strage level choose?
- メモリの使用量とCPU効率のトレードオフ
Removing Data
- キャッシュ使用状況をLRUで監視して適当に削除する
- 手動で削除したい場合はunpersist()メソッドを呼ぶ
Shared Valiable
- Sparkのoperationがリモートの各クラスターノードで実行される時、関数で使われる全てのデータの部分コピーに対してどうさせている。
- 値は各マシンへコピーされ、リモートマシン上でアップデートは行われずにdriver programに戻される
Broadcast Variables
- タスクとともにコピーを送るのではなく、各マシン上にread-onlyの値を保持することを許す
- Broadcast variableを作成した後、一回以上の呼び出しは行われない
Accumulators
- addされるのみのvariable
- 並列化が効率よくサポートされている
- Sparkは数値型のaccumulatorをサポートしている(新しい型の追加も可能)
AMP Campをひと通りさらってみる:第5回 Data Exploration Using SparkSQL
- 第1回 IntroductionとGetting Started
- 第2回 Data Exploration Using Spark
- 第3回 Explore In-Memory Data Store Tachyon
- 第4回 Movie Recommendation with MLlib
第5回はSparkSQLです。
Data Exploration Using SparkSQLのメモ
- SparkContextオブジェクトをラップしたSQLContextを作成する
- 例ではParquetフォーマットのデータを取り扱う
- Parquetフォーマット化されたWikipediaのデータからberkeleyという文字列を取り出す
- Parquestファイルの読み込み結果の戻り値はSchemaRDD
- SQLの結果は常にRowオブジェクトのArray
- Rowオブジェクトを通じて、各カラムにアクセスできる
SchemaRDDを作ってしまえばSQLが使えるので非常にETL処理が簡単になりますね。結果をまたRDD化して他に処理に渡していけばいいのかな、と。
動画のメモ
- 第2回のものと同じ
AMP Campをひと通りさらってみる:第4回 Movie Recommendation with MLlib
- 第1回 IntroductionとGetting Started
- 第2回 Data Exploration Using Spark
- 第3回 Explore In-Memory Data Store Tachyon
これ順番が動画順なので、厳密にはAMP CampのWebページのメニュー順番とは異なっていますね。。。第4回目はMLlibで一応自分の中では本丸です。
Movie Recommendation with MLlibのメモ
- MovieLensのデータを使ったMLlibのサンプル
Data set
- データセットはGetting Startedより取得できる
Collaborative filtering
Create training examples
Setup
- まずはじめにSparkConfを作成
- ratingを読み込むためにSparkContextを作成
- ratingを(Int, Ratingオブジェクト)のペアに分解
- timestampの数値をランダムなキーとして保持
- Ratingオブジェクトはタプルのラッパー
- ratingと同じようにmovieのidとタイトルを取得している
- 読み込んで、splitしたものをcollectして集めて、map化している。
Running the program
- ratingのレコード数、ユニークユーザー数などの項目を取得するコードを追加
- sbtでビルドして、spark-submitで動かす
自分のパスだと以下の様な感じで
$ cd machine-learning/scala $ ../../spark/bin/spark-submit --class MovieLensALS target/scala-2.10/movielens-als-assembly-0.1.jar ../../data/movielens/medium ../personalRatings.txt Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/01/03 00:39:02 INFO Slf4jLogger: Slf4jLogger started 15/01/03 00:39:02 INFO Remoting: Starting remoting 15/01/03 00:39:03 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.6:56004] 15/01/03 00:39:03 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.0.6:56004] 2015-01-03 00:39:03.976 java[32045:1903] Unable to load realm info from SCDynamicStore 15/01/03 00:39:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/01/03 00:39:04 WARN LoadSnappy: Snappy native library not loaded 15/01/03 00:39:04 INFO FileInputFormat: Total input paths to process : 1 15/01/03 00:39:05 INFO FileInputFormat: Total input paths to process : 1 Got 1000209 ratings from 6040users on 3706 movies.
という感じで準備オッケー
Splitting training data
- データをtraining, test, validationに分ける
- そのためにtimestampを使ったランダムなキーを残しておいた
Training using ALS
- rankとlambdaというパラメータが重要
- だけど時間の都合上8パターン(rank2パターン、lambda2パターン、繰り返し回数2パターンの組み合わせ23=8パターン)で試す
- RMSEが最小のものを良いモデルとして選択
Recommending movies for you
- 事前に作成した自分のレーティングを基にオススメの映画をレコメンド
Comparing to a naive baseline
- ratingの平均をbaselineとしてどれだけ改善したか
Augmenting matrix factors
- もし興味をもったらMatrixFactorizationModelの実装を見てみましょうね
ALSの参考
レコメンドも前職の途中までは真面目に勉強していましたが約2年くらいブランクがあるので色々おさらいが必要そうです。。。
- カエルでもわかる!Spark / MLlib でやってみる協調フィルタリング(前編)
- カエルでもわかる!Spark / MLlib でやってみる協調フィルタリング(後編)
- Amazon Elastic MapReduceで、Apache Mahoutの分散次元縮約(Parallel ALS)を動かす
- Apache Mahoutの分散次元縮約(Parallel ALS)を解説しよう。
- MatrixFacorization を使った評価予測 ―アルゴリズムシリーズ 3―
動画のメモ
AMP Camp 5: MLlib - Ameet Talwalkar - YouTube
- MLlibの紹介
- MLlib自体はともかく、MLbase/MLpipelie/MLoptというもう少し大きな話があった
- K-meansの話
- 映画のレコメンドを例に挙げた協調フィルタリングの話
- パフォーマンス(AWS上の16nodeの話)
- Mahoutよりも速い(もはやこういったベンチには段々意味がないと思うけど。。。)
- スケーラビリティもMahoutよりもよい
- 50node, 660M users, 2.4M items, 3.5B ratingsが40minuiteで完了
- 1.0から1.1になってパフォーマンスが大幅アップ
- ML pipelineの対応
- 1.2からの対応
- workflowの記述が簡単に
- モデルのチューニングに関する標準的なインターフェースを備える
- ML Dataset = SchemaRDDという感じでSparkSQLと統合を図る
なんとなくScalaのコードも見慣れつつあるので、ソースコードも読める気はしてきています。MLlibはもっと深堀りしていきたいですね。
AMP Campをひと通りさらってみる:第3回 Explore In-Memory Data Store Tachyon
Explore In-Memory Data Store Tachyonのメモ
- Tachyonはworking setのファイルをメモリ上に保持し、異なるジョブ、クエリやフレームワークからキャッシュされたファイルにメモリのスピードでアクセスすることを可能にする
- 頻繁に読まれるデータセットのディスクへのロードを避けることができる
- TachyonはSparkやMapReduceのプログラムをコード上の変更なしに動作させられる互換性がある
- オフヒープ領域を使うので、つまりRDDは自動的にTachyon内にデータを保持することができ、Sparkにより強い耐障害性とGCのオーバーヘッドを避ける事ができる
Configurations
conf/tachyon-env.sh
に変更を加えるTACHYON_WORKER_MEMORY_SIZE
がworkerのメモリサイズ
Format the strage
- ということで例に従ってやってみるもtachyonが起動せず。。。
- が、ここでは本論ではないので時間を使いすぎずにさっと眺めてみることに
- SparkContextにtachyonの設定を追加するだけで普通のSparkと同じように使えます。
- それなのにGCのコストが低減したり、executorがクラッシュした際にもデータは保たれていて素敵、というシロモノのようです。
動画のメモ
AMP Camp 5: Tachyon - Haoyuan Li - YouTube
- メモリはthroughputが指数関数的に伸びている
- 一方で、ディスクにはもう伸びの限界が来ている
- もっとメモリをうまく使うべし
- メモリをうまく使うアプローチは色々ある
- Spark, Impala, HANA, DBM2, etc...
- 問題1:データ処理をパイプラインで繋げる時、データの受け渡しがボトルネックになる
- すなわち、処理と処理との間のデータの受け渡し
- Sparkだと単一のタスクではデータはメモリ上にあるが、別のタスクに渡す際に、一度HDFSやS3を経由するので遅くなる
- 問題2:プロセスがクラッシュするとキャッシュしていたデータが無駄になる
- 問題3:異なるSparkのタスクが同一のデータを使おうとしていた場合、データの重複が起こる
- 問題を解決するために、メモリのスピードでクラスタ上のフレームワーク間でデータを共有するファイルシステムとしてTachyonを作った
余談
oxdataにはH2Oという機械学習ライブラリをオープンソースで公開しています。H2OはSpark対応が行われたSparkling Waterというのがあって、これまでイマイチ理解できていませんでしたが、ここまで学んできたことでようやく全貌を理解出来た気がします。
- RDDを拡張したH2ORDDを提供
- 処理自体はH2O本体のものを利用(この例ではH2OのDeepLearningを利用)
- データ取得部分などはSparkSQL等に任せ、その結果をTachyonに載っけておいて、H2OからはTachyon上のデータを参照する。(Sparkling WaterのスライドのP23-P31)
みたいな感じですね。H2Oはかなり早くからTachyonを利用することを念頭において開発が進められていたということで、なるほどな、と。
余談の参考
AMP Campをひと通りさらってみる:第2回 Data Exploration Using Spark
Data Exploration Using Sparkのメモ
- http://localhost:4040 で管理ツールが確認できる
- 2列目が言語を表しているのでそれを抽出したいという例でcacheを使う
- reduceByKeyの第2引数はreducerの数
- collectメソッドはRDDをArrayに変換している
動画のメモ
AMP Camp 5: SparkSQL - Michael Armbrust - YouTube
- どちらかというとSparkSQL押しな内容
- schemaRDDの話
- SparkSQLは1.2からHive/Json/Panquetなどにも対応。これはけっこう良さそう
- カラム指向でメモリ上にスキーマを保持
Spark Programming Guideのメモ
- Sparkアプリケーションはdriver programから構成され、ユーザーのmain関数といくつかのparallel operatorを実行する
- クラスターのノード上で並行処理ができる分割された要素のコレクションであるRDD(Resilient Distributed Dataset)
- 再利用を目的としてメモリ上にRDDを保持させることができる
- 並列処理時に利用されるshared variables
- デフォルトでは、Sparkは異なるノードの一連のタスクとして並列に関数を動かした時、関数内で利用される各変数のコピーを各タスクに送る
- Sparkがサポートするshared variables
- broadcast variables:全てのノード上のメモリに値をキャッシュさせるために利用される変数
- accumulators:カウンターや合計のように単純に"足し合わせる"ための変数
Linking With Spark
Initializing Spark
- 初めにすべきことはSparkContextオブジェクトを作成すること
- SparkContextはクラスターにどのようにデータにアクセスするかを教える
- SparkContextオブジェクトを作成するに辺り、アプリケーションの情報を含んだSparkConfオブジェクトを作成する
- JVM毎に1つのSparkContextがアクティブにできる
- 新しいSparkContextを作りたければstop()を行わなければならない
- setAppNameにはClusterUI上に表示するアプリケーション名を渡す
- setMasterにはここの値を色々入れる。とりあえずlocal
Using the Shell
- 起動時にscという特別なSparkContextオブジェクトが作られている
RDD
その他
やっぱりRDDがポイントそうですね。
AMP Campをひと通りさらってみる:第1回 IntroductionとGetting Started
AMP Campとは?
AMP CampはBerkeleyのAMP Labによるオープンソースのデータ解析用テクノロジースタックであるBDAS (the Berkeley Data Analytics Stack)のBoot Campみたいです。年1回開催されるようで、すでに2014年開催で5回目のようです。ハンズオンなんかが充実しているのでひと通りさらってみました。
IntroductionとGetting Started
Camp概要と準備
AMP Campの概要です。ざざっと雰囲気が分かりますが、Sparkのエコシステムがひと通りさらえるようです。で、PrerequisitesのAssumptionに"You have experience using the core Spark APIs"とかあって、BootCampのくせに経験者対象かよ、という感じですが、その下にIntroduction to the Scala Shellがありますので、まずはこちらからやりましょう。
SparkはScala/JavaもしくはPythonのインターフェースを備えていますが、Scalaがわかっていた方が何かと便利なはずです。楽せずScalaを見てみます。 順番は前後しますが直前のエントリにて触ってみた話を載せていますので参考までに。
Scala面白いのでもう少し色々と勉強したくなりますが、ぐっとこらえてそのまま進みます。で、読み進めるとGetting Startedをフォローしておけよ、ということで、見てみると、どうやら参加者には必要なソフトウェアやデータ一式が入ったUSBが渡されるようです。Getting Startedのページから一式を取得できるのでゲットしておきます。
下記はremote参加者用のダウンロードURLです。 * software一式 * データ一式
SimpleAppのビルド
software一式はAMPCAMPとか適当な名前のディレクトリを掘って全てそこに入れるようにします。 そして、sbtでビルドします。
$ ../sbt/sbt package Getting org.scala-sbt sbt 0.13.5 ... :: retrieving :: org.scala-sbt#boot-app confs: [default] 44 artifacts copied, 0 already retrieved (13482kB/521ms) Getting Scala 2.10.4 (for sbt)... :: retrieving :: org.scala-sbt#boot-scala confs: [default] 5 artifacts copied, 0 already retrieved (24459kB/142ms) [info] Set current project to Simple Project (in build file:/Users/norihiro_shimoda/Work/study/spark/AMPCAMP/simple-app/) [info] Updating {file:/Users/norihiro_shimoda/Work/study/spark/AMPCAMP/simple-app/}simple-app... [info] Resolving org.fusesource.jansi#jansi;1.4 ... [info] Done updating. [info] Compiling 1 Scala source to /Users/norihiro_shimoda/Work/study/spark/AMPCAMP/simple-app/target/scala-2.10/classes... [info] Packaging /Users/norihiro_shimoda/Work/study/spark/AMPCAMP/simple-app/target/scala-2.10/simple-project_2.10-1.0.jar ... [info] Done packaging. [success] Total time: 10 s, completed 2014/12/31 23:35:42
successということで成功ですね。ビルドしたものを実行します。
$ ../spark/bin/spark-submit --class "SimpleApp" --master local target/scala-2.10/simple-project_2.10-1.0.jar Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/12/31 23:36:35 INFO SecurityManager: Changing view acls to: norihiro_shimoda, 14/12/31 23:36:35 INFO SecurityManager: Changing modify acls to: norihiro_shimoda, 14/12/31 23:36:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(norihiro_shimoda, ); users with modify permissions: Set(norihiro_shimoda, ) 14/12/31 23:36:36 INFO Slf4jLogger: Slf4jLogger started 14/12/31 23:36:36 INFO Remoting: Starting remoting 14/12/31 23:36:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.6:60528] ...<中略>... 14/12/31 23:36:38 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 11 ms on localhost (2/2) 14/12/31 23:36:38 INFO DAGScheduler: Stage 1 (count at SimpleApp.scala:13) finished in 0.023 s 14/12/31 23:36:38 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/12/31 23:36:38 INFO SparkContext: Job finished: count at SimpleApp.scala:13, took 0.044037 s Lines with a: 83, Lines with b: 38
という感じで出力されます。何やってるんだろうということでAMPCAMP/simple-app/src/main/scala/SimpleApp.scala
を見てみると、sparkのREADME.mdからaとbを含む行をカウントしているのが分かります。シンプルですね。
という感じでgetting startedは完了です。
動画
- 初回に流れるAMP Labの活動やSpark ecosystemの最近の同行についての動画
- 英語苦手な人でも最新のSpark事情が分かり、見ておいて損はない動画だと思います。
- 自分も得意ではないですが、十分理解できる内容でした。
Scala超入門
SparkのとっかかりとしてたぶんSparkの入門的なものであるAMPCampをさらってみています。
とりあえず、prerequireにあったScalaの超入門的なものをやりました。SparkはPythonでも動かせますが、真面目にやるにあたってはScalaは避けて通れませんので、当面はその辺りの勉強も必要ですね。
例ではScalaShellで動かしていましたが、Scala自体もほぼ初心者なので勉強を兼ねてソースコード化して動かしてみました。 Scalaはまだコップ本を読みつつなので、全然手に馴染んでないし、関数型っぽくうまく書けずまだまだだなぁ、と。2年くらい前にSICPの読書会に参加していた記憶よよみがえれ!
import scala.io.Source object Intro { // 3. cubeというInt型の値の3乗を計算する関数を定義 def cube(n: Int): Int = n * n * n // 6. 階乗を計算する関数を定義する。ループも再帰も使えるが、ここでは再帰を使う def factorial(n: Int): Int = { if (n == 0) 1 else n * factorial(n-1) } // ボーナス:ワードカウントの実装 // まだ良くわかっていないので答えをみた // mutableなHashMap使っているのでScalaらしくはない def wordcount(source: String): collection.mutable.Map[String, Int] = { val lines = Source.fromFile(source).getLines.toArray val counts = new collection.mutable.HashMap[String, Int].withDefaultValue(0) lines.flatMap(line => line.split(" ")).foreach(word => counts(word) += 1) return counts } def main(args: Array[String]) { // 2. myNumberというInt型のリストを定義 val myNumbers = List(1, 2, 3, 4, 5) // 4. 定義した関数をmyNumbersに提供する myNumbers.map(x => cube(x)) // myNumbers.map(cube) こっちのほうがscalaっぽい? // 5. 関数リテラル使って3,4と同じことを実現 myNumbers.map(x => x * x * x) // 6. 階乗を計算する関数を定義する。ループも再帰も使えるが、ここでは再帰を使う // factorialの定義は上部にて println(myNumbers.map(factorial)) // ボーナス:wordcountの実装 // sparkのREADME.mdをカレントディレクトリに持ってきた println(wordcount("README.md")) } }