AMP Campをひと通りさらってみる:第6回 RDD

Exerciseにはないですが、Spark Programming GuideからRDDを中心に。

RDDのメモ

Resilient Distributed Datasets(RDDs)

  • RDDを作るためには2つの方法がある
    • 既存のコレクションをdriverプログラム内で並列化する
    • 外部ストレージからデータセットを参照する

Parallerized Collection

  • SparkContextのparallelizeメソッドをコレクションに対して呼び出す
    • コレクションの要素は並列処理可能な分散データへと形を替えるためにコピーされる
  • 重要なパラメータはデータを分けるpartitionの数である
    • Sparkでは各々のpartitionに対してクラスタの一つのタスクが実行される
    • 典型的にはクラスタの各CPUに対して2-4のpartitionを臨むであろう
    • Sparkは自動的にpartitionを決めてくれるがparallelizeメソッドの第二引数で明示的に与えることもできる

External Dataset

  • ストレージにあるデータソースから分散データセットを作ることもできる
    • ストレージとしては幅広く対応している
    • ファイルフォーマットとしてはHadoopInputFormat
    • 例えば、テキストファイル読み込みはSparkContextのtextfileメソッドでデータソースのURIを記述する
  • ファイルを読む場合の注意点
    • ローカルのファイルパスを読む場合、全てのworkerノードが同一のパスで参照できなければならない。コピーするか共有ファイルシステムを利用する
    • ディレクトリや圧縮ファイル等にも対応

RDD Operations

  • RDDは2つのオペレーションをサポートしている
    • transform:既存のデータセットから新しいデータセットを作成する
      • 例:mapは各要素に関数を適用して新しいデータセットを作成する
    • action:データセットへの処理を実行した後にdriver programに値を返す
      • 例:reduceは全てのRDDの要素に関数を適用して集約し、結果をdriver programに返す
  • transformは遅延評価される
    • ベースとなるデータセット(例えばfile)を記憶している
    • actionが結果を要求するとき(実行されるとき)transformは実行される
  • デフォルトではactionが実行される時、毎回transformが実行される
    • persist(あるいはcache)を利用することでメモリ上に結果を保存しておくことができる
    • メモリだけでなくディスクや、複数ノードへのレプリケーションもサポートしている
val lines = sc.textFile("data.txt") // 1
val lineLengths = lines.map(s => s.length) // 2
val totalLength = lineLengths.reduce((a, b) => a + b) // 3
lineLengths.persist() // 4
  • 1は外部ファイルからRDDを定義している
    • 実際の読み込みが行われるわけではない
    • linesは単なるファイルへのポインタ
  • 2がtransform。各行の長さを求めている
    • 遅延評価なのでこのタイミングでは実行されない
  • 3がaction。各行の長さを足しあわせている。
    • このタイミングでSparkは処理をタスクに分割している
    • 分割された処理ごとに各マシンでmapとlocalでのreduce処理を行う
  • 4がpersist。
    • lineLengthの再計算をしたくない場合はこうすることで保存される

Passing Function to Spark

  • SparkのAPIクラスタ上で動作せるためのdriver program内の関数を渡すことを強く信頼していて(必要としてる?)、推奨される方法は2つ
    • 無名関数を渡す
    • グローバルなシングルトンオブジェクトの静的メソッドとして渡す
  • クラスインスタンスメソッドへの参照も渡すことができるが、メソッドを含んだクラスオブジェクト全体を渡す必要がある
    • つまり通常のクラス定義でメソッドを書いた場合、内部的にインスタンスを参照するコードになることが多いため、メソッドのみクラスタにばらまくことができないという話かな、と。
    • 避けるためにメソッド内のローカル変数に一度インスタンスのメンバ値をコピーしてから関数を実行させましょう、的な話ですかね

Working With Key-Value pair

  • key-valueのペアになっているRDDだけ使える特別な処理がある
    • reduceByKeyとか
    • key毎に何か処理をするパターン

RDD Persistent

  • Sparkの最も重要な特徴の一つ
    • データセットをメモリに保存して各処理で共通して利用できる
  • persist()かcache()メソッドを使う
  • persistには異なるstrage levelを選ぶことができる。
    • ディスクへの保存
    • メモリに保存するうがJavaのオブジェクトとしてシリアライズされた状態で保存する
    • 各ノードにレプリケーションする
    • Tachyon上のオフヒープ領域に保存する
  • persist()を呼ばなくても、reduceByKeyなどのshuffleが発生するオペレーションでは中間データを自動的に保存する

Which Strage level choose?

  • メモリの使用量とCPU効率のトレードオフ
    • MOMORY_ONLYで問題ないサイズのRDDならそのままで良い。CPU効率が最も良いオプション
    • MEMORY_ONLYが辛ければMEMORY_ONLY_SER(RDDシリアライズされたJavaのオブジェクトとして保持する。デシリアライズ分効率は落ちるがメモリ空間を上手く使える)
    • レプリケーションをおおなうstrage levelは障害復旧を早くしたい場合に再計算がなくなってよい

Removing Data

  • キャッシュ使用状況をLRUで監視して適当に削除する
    • 手動で削除したい場合はunpersist()メソッドを呼ぶ

Shared Valiable

  • Sparkのoperationがリモートの各クラスターノードで実行される時、関数で使われる全てのデータの部分コピーに対してどうさせている。
    • 値は各マシンへコピーされ、リモートマシン上でアップデートは行われずにdriver programに戻される

Broadcast Variables

  • タスクとともにコピーを送るのではなく、各マシン上にread-onlyの値を保持することを許す
    • 効率的な方法で大きな入力データのコピーを各ノードに与えることを可能にする
    • SparkContextのbroadcastメソッドでbroadcast variableを作ることができる。valueメソッドで元の値の取り出し
  • Broadcast variableを作成した後、一回以上の呼び出しは行われない

Accumulators

  • addされるのみのvariable
    • 並列化が効率よくサポートされている
    • Sparkは数値型のaccumulatorをサポートしている(新しい型の追加も可能)

AMP Campをひと通りさらってみる:第5回 Data Exploration Using SparkSQL

第5回はSparkSQLです。

Data Exploration Using SparkSQLのメモ

  • SparkContextオブジェクトをラップしたSQLContextを作成する
  • 例ではParquetフォーマットのデータを取り扱う
    • Parquetフォーマット化されたWikipediaのデータからberkeleyという文字列を取り出す
  • Parquestファイルの読み込み結果の戻り値はSchemaRDD
    • SchemaRDDは通常のRDDと同じ関数を全て持っている
    • カラムの名前や型などの情報を保持しているため、テーブル登録を行った後にSQLクエリを投げることができるようになる
  • SQLの結果は常にRowオブジェクトのArray
    • Rowオブジェクトを通じて、各カラムにアクセスできる

SchemaRDDを作ってしまえばSQLが使えるので非常にETL処理が簡単になりますね。結果をまたRDD化して他に処理に渡していけばいいのかな、と。

動画のメモ

  • 第2回のものと同じ

AMP Campをひと通りさらってみる:第4回 Movie Recommendation with MLlib

これ順番が動画順なので、厳密にはAMP CampのWebページのメニュー順番とは異なっていますね。。。第4回目はMLlibで一応自分の中では本丸です。

Movie Recommendation with MLlibのメモ

  • MovieLensのデータを使ったMLlibのサンプル

Data set

Collaborative filtering

  • 協調フィルタリングの説明
    • 色々あるけど、好みにまつわる隠れた要因を見つけ出す手法の一つであるALS(Alternating Least Squares)をMLlibは実装しているのでそれを利用する

Create training examples

  • 自分の評価結果を作成するPythonスクリプトが用意されていてそれを利用する
    • 推薦に使われる

Setup

  • まずはじめにSparkConfを作成
  • ratingを読み込むためにSparkContextを作成
  • ratingを(Int, Ratingオブジェクト)のペアに分解
    • timestampの数値をランダムなキーとして保持
    • Ratingオブジェクトはタプルのラッパー
  • ratingと同じようにmovieのidとタイトルを取得している
    • 読み込んで、splitしたものをcollectして集めて、map化している。

Running the program

  • ratingのレコード数、ユニークユーザー数などの項目を取得するコードを追加
  • sbtでビルドして、spark-submitで動かす

自分のパスだと以下の様な感じで

$ cd machine-learning/scala
$ ../../spark/bin/spark-submit --class MovieLensALS target/scala-2.10/movielens-als-assembly-0.1.jar ../../data/movielens/medium ../personalRatings.txt

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/01/03 00:39:02 INFO Slf4jLogger: Slf4jLogger started
15/01/03 00:39:02 INFO Remoting: Starting remoting
15/01/03 00:39:03 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.6:56004]
15/01/03 00:39:03 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.0.6:56004]
2015-01-03 00:39:03.976 java[32045:1903] Unable to load realm info from SCDynamicStore
15/01/03 00:39:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/01/03 00:39:04 WARN LoadSnappy: Snappy native library not loaded
15/01/03 00:39:04 INFO FileInputFormat: Total input paths to process : 1
15/01/03 00:39:05 INFO FileInputFormat: Total input paths to process : 1
Got 1000209 ratings from 6040users on 3706 movies.

という感じで準備オッケー

Splitting training data

  • データをtraining, test, validationに分ける
    • そのためにtimestampを使ったランダムなキーを残しておいた

Training using ALS

  • rankとlambdaというパラメータが重要
    • だけど時間の都合上8パターン(rank2パターン、lambda2パターン、繰り返し回数2パターンの組み合わせ23=8パターン)で試す
    • RMSEが最小のものを良いモデルとして選択

Recommending movies for you

  • 事前に作成した自分のレーティングを基にオススメの映画をレコメンド

Comparing to a naive baseline

  • ratingの平均をbaselineとしてどれだけ改善したか

Augmenting matrix factors

  • もし興味をもったらMatrixFactorizationModelの実装を見てみましょうね

ALSの参考

レコメンドも前職の途中までは真面目に勉強していましたが約2年くらいブランクがあるので色々おさらいが必要そうです。。。

動画のメモ


AMP Camp 5: MLlib - Ameet Talwalkar - YouTube

  • MLlibの紹介
    • MLlib自体はともかく、MLbase/MLpipelie/MLoptというもう少し大きな話があった
  • K-meansの話
  • 映画のレコメンドを例に挙げた協調フィルタリングの話
    • 欠損値がある
    • レーティングはより小さな要因から決定されていると仮定
      • user/movie factorに分解
    • ALSはユーザーはuser, movieの双方をアップデートしていく
      • 並列にアップデートできる
    • ALSを使ったレコメンドがハンズオンの内容
  • パフォーマンス(AWS上の16nodeの話)
    • Mahoutよりも速い(もはやこういったベンチには段々意味がないと思うけど。。。)
    • スケーラビリティもMahoutよりもよい
    • 50node, 660M users, 2.4M items, 3.5B ratingsが40minuiteで完了
  • 1.0から1.1になってパフォーマンスが大幅アップ
  • ML pipelineの対応
    • 1.2からの対応
    • workflowの記述が簡単に
    • モデルのチューニングに関する標準的なインターフェースを備える
  • ML Dataset = SchemaRDDという感じでSparkSQLと統合を図る

なんとなくScalaのコードも見慣れつつあるので、ソースコードも読める気はしてきています。MLlibはもっと深堀りしていきたいですね。

AMP Campをひと通りさらってみる:第3回 Explore In-Memory Data Store Tachyon

Explore In-Memory Data Store Tachyonのメモ

  • Tachyonはworking setのファイルをメモリ上に保持し、異なるジョブ、クエリやフレームワークからキャッシュされたファイルにメモリのスピードでアクセスすることを可能にする
    • 頻繁に読まれるデータセットのディスクへのロードを避けることができる
  • TachyonはSparkやMapReduceのプログラムをコード上の変更なしに動作させられる互換性がある
  • オフヒープ領域を使うので、つまりRDDは自動的にTachyon内にデータを保持することができ、Sparkにより強い耐障害性とGCのオーバーヘッドを避ける事ができる

Configurations

  • conf/tachyon-env.shに変更を加える
    • TACHYON_WORKER_MEMORY_SIZEがworkerのメモリサイズ

Format the strage

  • ということで例に従ってやってみるもtachyonが起動せず。。。
    • が、ここでは本論ではないので時間を使いすぎずにさっと眺めてみることに
  • SparkContextにtachyonの設定を追加するだけで普通のSparkと同じように使えます。
    • それなのにGCのコストが低減したり、executorがクラッシュした際にもデータは保たれていて素敵、というシロモノのようです。

動画のメモ


AMP Camp 5: Tachyon - Haoyuan Li - YouTube

  • メモリはthroughputが指数関数的に伸びている
    • 一方で、ディスクにはもう伸びの限界が来ている
    • もっとメモリをうまく使うべし
  • メモリをうまく使うアプローチは色々ある
    • Spark, Impala, HANA, DBM2, etc...
  • 問題1:データ処理をパイプラインで繋げる時、データの受け渡しがボトルネックになる
    • すなわち、処理と処理との間のデータの受け渡し
    • Sparkだと単一のタスクではデータはメモリ上にあるが、別のタスクに渡す際に、一度HDFSやS3を経由するので遅くなる
  • 問題2:プロセスがクラッシュするとキャッシュしていたデータが無駄になる
  • 問題3:異なるSparkのタスクが同一のデータを使おうとしていた場合、データの重複が起こる
  • 問題を解決するために、メモリのスピードでクラスタ上のフレームワーク間でデータを共有するファイルシステムとしてTachyonを作った

余談

oxdataにはH2Oという機械学習ライブラリをオープンソースで公開しています。H2OはSpark対応が行われたSparkling Waterというのがあって、これまでイマイチ理解できていませんでしたが、ここまで学んできたことでようやく全貌を理解出来た気がします。

  • RDDを拡張したH2ORDDを提供
  • 処理自体はH2O本体のものを利用(この例ではH2OのDeepLearningを利用)
  • データ取得部分などはSparkSQL等に任せ、その結果をTachyonに載っけておいて、H2OからはTachyon上のデータを参照する。(Sparkling WaterのスライドのP23-P31)

みたいな感じですね。H2Oはかなり早くからTachyonを利用することを念頭において開発が進められていたということで、なるほどな、と。

余談の参考

  • DataBricks BlogのSparkling Warterに関する記事
  • OxdataによるSparkling Warterのスライド

AMP Campをひと通りさらってみる:第2回 Data Exploration Using Spark

Data Exploration Using Sparkのメモ

  • http://localhost:4040 で管理ツールが確認できる
  • 2列目が言語を表しているのでそれを抽出したいという例でcacheを使う
  • reduceByKeyの第2引数はreducerの数
  • collectメソッドRDDをArrayに変換している

動画のメモ


AMP Camp 5: SparkSQL - Michael Armbrust - YouTube

  • どちらかというとSparkSQL押しな内容
  • schemaRDDの話
  • SparkSQLは1.2からHive/Json/Panquetなどにも対応。これはけっこう良さそう
  • カラム指向でメモリ上にスキーマを保持

Spark Programming Guideのメモ

  • Sparkアプリケーションはdriver programから構成され、ユーザーのmain関数といくつかのparallel operatorを実行する
  • クラスターのノード上で並行処理ができる分割された要素のコレクションであるRDD(Resilient Distributed Dataset)
    • 再利用を目的としてメモリ上にRDDを保持させることができる
  • 並列処理時に利用されるshared variables
    • デフォルトでは、Sparkは異なるノードの一連のタスクとして並列に関数を動かした時、関数内で利用される各変数のコピーを各タスクに送る
  • Sparkがサポートするshared variables
    • broadcast variables:全てのノード上のメモリに値をキャッシュさせるために利用される変数
    • accumulators:カウンターや合計のように単純に"足し合わせる"ための変数

Linking With Spark

  • Mavenの設定が必要
    • HDFSつかうときは追加でMavenの登録が必要

Initializing Spark

  • 初めにすべきことはSparkContextオブジェクトを作成すること
    • SparkContextはクラスターにどのようにデータにアクセスするかを教える
  • SparkContextオブジェクトを作成するに辺り、アプリケーションの情報を含んだSparkConfオブジェクトを作成する
  • JVM毎に1つのSparkContextがアクティブにできる
    • 新しいSparkContextを作りたければstop()を行わなければならない
  • setAppNameにはClusterUI上に表示するアプリケーション名を渡す
  • setMasterにはここの値を色々入れる。とりあえずlocal

Using the Shell

  • 起動時にscという特別なSparkContextオブジェクトが作られている

RDD

  • RDDここでかなり分量を使って書かれているので、また後でしっかり見てみます。

その他

  • standaloneでのプログラムの書き方なども学びましょうということで第一回のAMP Campの資料が参照されている

やっぱりRDDがポイントそうですね。

AMP Campをひと通りさらってみる:第1回 IntroductionとGetting Started

AMP Campとは?

AMP CampはBerkeleyのAMP Labによるオープンソースのデータ解析用テクノロジースタックであるBDAS (the Berkeley Data Analytics Stack)のBoot Campみたいです。年1回開催されるようで、すでに2014年開催で5回目のようです。ハンズオンなんかが充実しているのでひと通りさらってみました。

IntroductionとGetting Started

Camp概要と準備

AMP Campの概要です。ざざっと雰囲気が分かりますが、Sparkのエコシステムがひと通りさらえるようです。で、PrerequisitesのAssumptionに"You have experience using the core Spark APIs"とかあって、BootCampのくせに経験者対象かよ、という感じですが、その下にIntroduction to the Scala Shellがありますので、まずはこちらからやりましょう。

SparkはScala/JavaもしくはPythonのインターフェースを備えていますが、Scalaがわかっていた方が何かと便利なはずです。楽せずScalaを見てみます。 順番は前後しますが直前のエントリにて触ってみた話を載せていますので参考までに。

Scala面白いのでもう少し色々と勉強したくなりますが、ぐっとこらえてそのまま進みます。で、読み進めるとGetting Startedをフォローしておけよ、ということで、見てみると、どうやら参加者には必要なソフトウェアやデータ一式が入ったUSBが渡されるようです。Getting Startedのページから一式を取得できるのでゲットしておきます。

下記はremote参加者用のダウンロードURLです。 * software一式 * データ一式

SimpleAppのビルド

software一式はAMPCAMPとか適当な名前のディレクトリを掘って全てそこに入れるようにします。 そして、sbtでビルドします。

$ ../sbt/sbt package
Getting org.scala-sbt sbt 0.13.5 ...
:: retrieving :: org.scala-sbt#boot-app
    confs: [default]
    44 artifacts copied, 0 already retrieved (13482kB/521ms)
Getting Scala 2.10.4 (for sbt)...
:: retrieving :: org.scala-sbt#boot-scala
    confs: [default]
    5 artifacts copied, 0 already retrieved (24459kB/142ms)
[info] Set current project to Simple Project (in build file:/Users/norihiro_shimoda/Work/study/spark/AMPCAMP/simple-app/)
[info] Updating {file:/Users/norihiro_shimoda/Work/study/spark/AMPCAMP/simple-app/}simple-app...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Compiling 1 Scala source to /Users/norihiro_shimoda/Work/study/spark/AMPCAMP/simple-app/target/scala-2.10/classes...
[info] Packaging /Users/norihiro_shimoda/Work/study/spark/AMPCAMP/simple-app/target/scala-2.10/simple-project_2.10-1.0.jar ...
[info] Done packaging.
[success] Total time: 10 s, completed 2014/12/31 23:35:42

successということで成功ですね。ビルドしたものを実行します。

$ ../spark/bin/spark-submit --class "SimpleApp" --master local target/scala-2.10/simple-project_2.10-1.0.jar 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/12/31 23:36:35 INFO SecurityManager: Changing view acls to: norihiro_shimoda,
14/12/31 23:36:35 INFO SecurityManager: Changing modify acls to: norihiro_shimoda,
14/12/31 23:36:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(norihiro_shimoda, ); users with modify permissions: Set(norihiro_shimoda, )
14/12/31 23:36:36 INFO Slf4jLogger: Slf4jLogger started
14/12/31 23:36:36 INFO Remoting: Starting remoting
14/12/31 23:36:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.6:60528]
...<中略>...
14/12/31 23:36:38 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 11 ms on localhost (2/2)
14/12/31 23:36:38 INFO DAGScheduler: Stage 1 (count at SimpleApp.scala:13) finished in 0.023 s
14/12/31 23:36:38 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
14/12/31 23:36:38 INFO SparkContext: Job finished: count at SimpleApp.scala:13, took 0.044037 s
Lines with a: 83, Lines with b: 38

という感じで出力されます。何やってるんだろうということでAMPCAMP/simple-app/src/main/scala/SimpleApp.scalaを見てみると、sparkのREADME.mdからaとbを含む行をカウントしているのが分かります。シンプルですね。

という感じでgetting startedは完了です。

動画

  • 初回に流れるAMP Labの活動やSpark ecosystemの最近の同行についての動画
    • 英語苦手な人でも最新のSpark事情が分かり、見ておいて損はない動画だと思います。
    • 自分も得意ではないですが、十分理解できる内容でした。


AMP Camp 5: Intro &amp; Overview - Michael Franklin ...

Scala超入門

SparkのとっかかりとしてたぶんSparkの入門的なものであるAMPCampをさらってみています。

とりあえず、prerequireにあったScalaの超入門的なものをやりました。SparkはPythonでも動かせますが、真面目にやるにあたってはScalaは避けて通れませんので、当面はその辺りの勉強も必要ですね。

例ではScalaShellで動かしていましたが、Scala自体もほぼ初心者なので勉強を兼ねてソースコード化して動かしてみました。 Scalaはまだコップ本を読みつつなので、全然手に馴染んでないし、関数型っぽくうまく書けずまだまだだなぁ、と。2年くらい前にSICPの読書会に参加していた記憶よよみがえれ!

import scala.io.Source

object Intro {
  // 3. cubeというInt型の値の3乗を計算する関数を定義
  def cube(n: Int): Int = n * n * n

  // 6. 階乗を計算する関数を定義する。ループも再帰も使えるが、ここでは再帰を使う
  def factorial(n: Int): Int = {
    if (n == 0) 1 else n * factorial(n-1)
  } 
  
  // ボーナス:ワードカウントの実装
  // まだ良くわかっていないので答えをみた
  // mutableなHashMap使っているのでScalaらしくはない
  def wordcount(source: String): collection.mutable.Map[String, Int] = {
    val lines = Source.fromFile(source).getLines.toArray
    val counts = new collection.mutable.HashMap[String, Int].withDefaultValue(0)
    lines.flatMap(line => line.split(" ")).foreach(word => counts(word) += 1)
    return counts
  } 

  def main(args: Array[String]) {
    // 2. myNumberというInt型のリストを定義
    val myNumbers = List(1, 2, 3, 4, 5)
    
    // 4. 定義した関数をmyNumbersに提供する
    myNumbers.map(x => cube(x))
    // myNumbers.map(cube) こっちのほうがscalaっぽい?
    
    // 5. 関数リテラル使って3,4と同じことを実現
    myNumbers.map(x => x * x * x)

    // 6. 階乗を計算する関数を定義する。ループも再帰も使えるが、ここでは再帰を使う
    // factorialの定義は上部にて
    println(myNumbers.map(factorial))
    
    // ボーナス:wordcountの実装
    // sparkのREADME.mdをカレントディレクトリに持ってきた
    println(wordcount("README.md"))
  }
}