AMP Campをひと通りさらってみる:第6回 RDD
- 第1回 IntroductionとGetting Started
- 第2回 Data Exploration Using Spark
- 第3回 Explore In-Memory Data Store Tachyon
- 第4回 Movie Recommendation with MLlib
- 第5回 Data Exploration Using SparkSQL
Exerciseにはないですが、Spark Programming GuideからRDDを中心に。
RDDのメモ
Resilient Distributed Datasets(RDDs)
Parallerized Collection
- SparkContextのparallelizeメソッドをコレクションに対して呼び出す
- コレクションの要素は並列処理可能な分散データへと形を替えるためにコピーされる
- 重要なパラメータはデータを分けるpartitionの数である
External Dataset
- ストレージにあるデータソースから分散データセットを作ることもできる
- ストレージとしては幅広く対応している
- ファイルフォーマットとしてはHadoopのInputFormat
- 例えば、テキストファイル読み込みはSparkContextのtextfileメソッドでデータソースのURIを記述する
- ファイルを読む場合の注意点
- ローカルのファイルパスを読む場合、全てのworkerノードが同一のパスで参照できなければならない。コピーするか共有ファイルシステムを利用する
- ディレクトリや圧縮ファイル等にも対応
RDD Operations
- RDDは2つのオペレーションをサポートしている
- transformは遅延評価される
- ベースとなるデータセット(例えばfile)を記憶している
- actionが結果を要求するとき(実行されるとき)transformは実行される
- デフォルトではactionが実行される時、毎回transformが実行される
val lines = sc.textFile("data.txt") // 1 val lineLengths = lines.map(s => s.length) // 2 val totalLength = lineLengths.reduce((a, b) => a + b) // 3 lineLengths.persist() // 4
- 1は外部ファイルからRDDを定義している
- 実際の読み込みが行われるわけではない
- linesは単なるファイルへのポインタ
- 2がtransform。各行の長さを求めている
- 遅延評価なのでこのタイミングでは実行されない
- 3がaction。各行の長さを足しあわせている。
- このタイミングでSparkは処理をタスクに分割している
- 分割された処理ごとに各マシンでmapとlocalでのreduce処理を行う
- 4がpersist。
- lineLengthの再計算をしたくない場合はこうすることで保存される
Passing Function to Spark
- SparkのAPIはクラスタ上で動作せるためのdriver program内の関数を渡すことを強く信頼していて(必要としてる?)、推奨される方法は2つ
- 無名関数を渡す
- グローバルなシングルトンオブジェクトの静的メソッドとして渡す
- クラスインスタンスのメソッドへの参照も渡すことができるが、メソッドを含んだクラスオブジェクト全体を渡す必要がある
Working With Key-Value pair
- key-valueのペアになっているRDDだけ使える特別な処理がある
- reduceByKeyとか
- key毎に何か処理をするパターン
RDD Persistent
- Sparkの最も重要な特徴の一つ
- データセットをメモリに保存して各処理で共通して利用できる
- persist()かcache()メソッドを使う
- persistには異なるstrage levelを選ぶことができる。
- persist()を呼ばなくても、reduceByKeyなどのshuffleが発生するオペレーションでは中間データを自動的に保存する
Which Strage level choose?
- メモリの使用量とCPU効率のトレードオフ
Removing Data
- キャッシュ使用状況をLRUで監視して適当に削除する
- 手動で削除したい場合はunpersist()メソッドを呼ぶ
Shared Valiable
- Sparkのoperationがリモートの各クラスターノードで実行される時、関数で使われる全てのデータの部分コピーに対してどうさせている。
- 値は各マシンへコピーされ、リモートマシン上でアップデートは行われずにdriver programに戻される
Broadcast Variables
- タスクとともにコピーを送るのではなく、各マシン上にread-onlyの値を保持することを許す
- Broadcast variableを作成した後、一回以上の呼び出しは行われない
Accumulators
- addされるのみのvariable
- 並列化が効率よくサポートされている
- Sparkは数値型のaccumulatorをサポートしている(新しい型の追加も可能)