AMP Campをひと通りさらってみる:第6回 RDD

Exerciseにはないですが、Spark Programming GuideからRDDを中心に。

RDDのメモ

Resilient Distributed Datasets(RDDs)

  • RDDを作るためには2つの方法がある
    • 既存のコレクションをdriverプログラム内で並列化する
    • 外部ストレージからデータセットを参照する

Parallerized Collection

  • SparkContextのparallelizeメソッドをコレクションに対して呼び出す
    • コレクションの要素は並列処理可能な分散データへと形を替えるためにコピーされる
  • 重要なパラメータはデータを分けるpartitionの数である
    • Sparkでは各々のpartitionに対してクラスタの一つのタスクが実行される
    • 典型的にはクラスタの各CPUに対して2-4のpartitionを臨むであろう
    • Sparkは自動的にpartitionを決めてくれるがparallelizeメソッドの第二引数で明示的に与えることもできる

External Dataset

  • ストレージにあるデータソースから分散データセットを作ることもできる
    • ストレージとしては幅広く対応している
    • ファイルフォーマットとしてはHadoopInputFormat
    • 例えば、テキストファイル読み込みはSparkContextのtextfileメソッドでデータソースのURIを記述する
  • ファイルを読む場合の注意点
    • ローカルのファイルパスを読む場合、全てのworkerノードが同一のパスで参照できなければならない。コピーするか共有ファイルシステムを利用する
    • ディレクトリや圧縮ファイル等にも対応

RDD Operations

  • RDDは2つのオペレーションをサポートしている
    • transform:既存のデータセットから新しいデータセットを作成する
      • 例:mapは各要素に関数を適用して新しいデータセットを作成する
    • action:データセットへの処理を実行した後にdriver programに値を返す
      • 例:reduceは全てのRDDの要素に関数を適用して集約し、結果をdriver programに返す
  • transformは遅延評価される
    • ベースとなるデータセット(例えばfile)を記憶している
    • actionが結果を要求するとき(実行されるとき)transformは実行される
  • デフォルトではactionが実行される時、毎回transformが実行される
    • persist(あるいはcache)を利用することでメモリ上に結果を保存しておくことができる
    • メモリだけでなくディスクや、複数ノードへのレプリケーションもサポートしている
val lines = sc.textFile("data.txt") // 1
val lineLengths = lines.map(s => s.length) // 2
val totalLength = lineLengths.reduce((a, b) => a + b) // 3
lineLengths.persist() // 4
  • 1は外部ファイルからRDDを定義している
    • 実際の読み込みが行われるわけではない
    • linesは単なるファイルへのポインタ
  • 2がtransform。各行の長さを求めている
    • 遅延評価なのでこのタイミングでは実行されない
  • 3がaction。各行の長さを足しあわせている。
    • このタイミングでSparkは処理をタスクに分割している
    • 分割された処理ごとに各マシンでmapとlocalでのreduce処理を行う
  • 4がpersist。
    • lineLengthの再計算をしたくない場合はこうすることで保存される

Passing Function to Spark

  • SparkのAPIクラスタ上で動作せるためのdriver program内の関数を渡すことを強く信頼していて(必要としてる?)、推奨される方法は2つ
    • 無名関数を渡す
    • グローバルなシングルトンオブジェクトの静的メソッドとして渡す
  • クラスインスタンスメソッドへの参照も渡すことができるが、メソッドを含んだクラスオブジェクト全体を渡す必要がある
    • つまり通常のクラス定義でメソッドを書いた場合、内部的にインスタンスを参照するコードになることが多いため、メソッドのみクラスタにばらまくことができないという話かな、と。
    • 避けるためにメソッド内のローカル変数に一度インスタンスのメンバ値をコピーしてから関数を実行させましょう、的な話ですかね

Working With Key-Value pair

  • key-valueのペアになっているRDDだけ使える特別な処理がある
    • reduceByKeyとか
    • key毎に何か処理をするパターン

RDD Persistent

  • Sparkの最も重要な特徴の一つ
    • データセットをメモリに保存して各処理で共通して利用できる
  • persist()かcache()メソッドを使う
  • persistには異なるstrage levelを選ぶことができる。
    • ディスクへの保存
    • メモリに保存するうがJavaのオブジェクトとしてシリアライズされた状態で保存する
    • 各ノードにレプリケーションする
    • Tachyon上のオフヒープ領域に保存する
  • persist()を呼ばなくても、reduceByKeyなどのshuffleが発生するオペレーションでは中間データを自動的に保存する

Which Strage level choose?

  • メモリの使用量とCPU効率のトレードオフ
    • MOMORY_ONLYで問題ないサイズのRDDならそのままで良い。CPU効率が最も良いオプション
    • MEMORY_ONLYが辛ければMEMORY_ONLY_SER(RDDシリアライズされたJavaのオブジェクトとして保持する。デシリアライズ分効率は落ちるがメモリ空間を上手く使える)
    • レプリケーションをおおなうstrage levelは障害復旧を早くしたい場合に再計算がなくなってよい

Removing Data

  • キャッシュ使用状況をLRUで監視して適当に削除する
    • 手動で削除したい場合はunpersist()メソッドを呼ぶ

Shared Valiable

  • Sparkのoperationがリモートの各クラスターノードで実行される時、関数で使われる全てのデータの部分コピーに対してどうさせている。
    • 値は各マシンへコピーされ、リモートマシン上でアップデートは行われずにdriver programに戻される

Broadcast Variables

  • タスクとともにコピーを送るのではなく、各マシン上にread-onlyの値を保持することを許す
    • 効率的な方法で大きな入力データのコピーを各ノードに与えることを可能にする
    • SparkContextのbroadcastメソッドでbroadcast variableを作ることができる。valueメソッドで元の値の取り出し
  • Broadcast variableを作成した後、一回以上の呼び出しは行われない

Accumulators

  • addされるのみのvariable
    • 並列化が効率よくサポートされている
    • Sparkは数値型のaccumulatorをサポートしている(新しい型の追加も可能)