Strata + Hadoop World参加記録 その2

2daysのチケットしか持っていませんので、本日はexpo hallをうろうろしました。 けっこう広い!あと、日本のイベントと違って会場内でアルコールが飲める!(その分参加費が異常に高い!!)

英語は拙いながら、我らが(?)TreasureDataさんのブースもあったりして中々楽しく回らせてもらいました。

f:id:rindai87:20150219191709j:plain

個人的にimpressiveだった事

「Strata + Hadoop conference」だけどほとんどSpark一色だった

時代の移り変わりなのでしょうか? みんなSparkとかストリーミングとか言ってる感じです。

インメモリでガンガン行こうぜ的な

インメモリでガンガン行こうぜ!という感じのモノがけっこうあった気がします。 その最たるものがmemsqlかな、と。VoltDBとの違い等はまだ整理しきれていませんが、30億レコードをささっとgroup byするデモなどを見ると、なんかそうですよねー、というものを感じます。ちなみに、マネジメントコンソール的なものでその時のマシンを見せてもらいましたが、そこまでのスペックでもなかったのが印象的です。

Tシャツやらノベルティやら

ミーハーごころ丸出しで色々いただきました〜

f:id:rindai87:20150219191652j:plain

  • memsql, TreasureData, Cloudera, MicrosoftのTシャツなどをゲットしました!

後は、DataBricksのCEOのIon Stoicaさんやら Doug Cuttingさんと挨拶させて頂くなどしました。HadoopとSparkのお父さん達に多謝。

Strata + Hadoop World参加記録 その1

色々ありまして、出張にてStrataに参加できることになり、前日にあるmeetupに参加してきました。会社に深い感謝の念を抱きつつ、記録的な何かを残していきます。Strata自体はまだ始まっていないけど同会場の一部でのmeetupだったため、前夜祭って感じが出ていました。

f:id:rindai87:20150218185118j:plain

事前に発表予定だったClouderaの人等が参加できなくなったり、食べ物提供予定がなくなったりということで、400人程度参加表明をしつつ実際は50人もいないくらい?な感じでした。

f:id:rindai87:20150218185132j:plain

内容としては、Spark1.3に入ってくるDataFramesの紹介でしたが、これまじでスゴイですね。 ほぼ同内容がブログでも公開されています。

  • PandasやRのdataframeにインスパイアされたインターフェースで敷居が低い
  • (DataSourceAPIとの関連だと思うけど)多様なデータソースに対応
  • pipelineAPIとのコンボ
  • Sparkそのものが遅延評価で動くことを活かして、裏側ではSparkSQLで利用するCatalystを利用するのでパフォーマンスが良い

ということで、今のところは良いことずくしに見えます。

※DataBricksが公開している検証結果(ブログ記事より抜粋)

という感じでかなり広く受け入れられそうな機能がどんどん増えていて大変素晴らしいという感じを受けています。

とりあえず触ってみないとですね。Spark進化が早くて追いかけるだけでもすごく大変です。せっかく現地にいるので速報っぽいのをやってみたかったので書いてみました。

Sparkの2015年の展望

こんな動画がありました。


What's coming for Spark in 2015 - Bay Area Spark ...

ざっくり言うとSparkの2015年はSparkSQL(SchemaRDD)機械学習部分に力を入れていきそうな感じでしょうか。

ざっくり動画を見た感じだと

  • SaprkSQLが使い物になるのであれば、色々と使い勝手の幅が広がりそう
  • Datasource APIが色々拡充しそうで良い
  • ML Pipelineも非常に興味深い
  • MLlibにもstreamingに対応したものがポツポツ出始めている。例えばStreaming K-meansとか

何となくSchemaRDDを中心とした世界観が見えてきた感じがして引き続き目が話せないなぁ、という感じです。

Kafkaをとりあえず動かす

最近TL等ではKafkaという単語をよく目にするようになってきましたが、kafkaとググっても日本語の情報がほとんどないのが悲しいですね。というわけで本家ドキュメントのQuick Start的なモノを動かしてみた話です。

本当に動かすだけです。が、以外にこのレベルの情報もない!のがKafkaですね。もっとあっても良さそうなのに。。。

Kafkaってなんなのよ、って話は今は触れずにとりあえずKafkaをローカル環境にて動かしてみます。

Apache Kafka

公式ドキュメントのQuick Startに従います。

とりあえず動かす

Kafkaを取ってくる

ミラーサイトからKafkaを取ってきます。最新版は8.2.0のようですが、公式ドキュメントは8.1.1なので、8.1.1を取ってきます。 適当に展開しましょう。

$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
$ tar xzvf kafka_2.9.2-0.8.1.1.tgz

ZooKeeperを起動する

KafkaはZooKeeperが必須のプロダクトとなっていますので、Kafkaを立ち上げる前に、ZooKeeperも起動しておきます。 先ほど取ってきたKafkaのディレクトリにZooKeeperも同梱されているのでテスト的に利用する際はそちらを利用するとよいでしょう。 bin/ディレクトリに各種操作実行用のスクリプトconfig/ディレクトリ以下にKafkaやZooKeeperの設定ファイル一式が入っています。

まずはZooKeeprの起動です。

$ cd kafka_2.9.2-0.8.1.1
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2015-02-05 14:48:29,847] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-02-05 14:48:29,848] WARN Either no config or no quorum defined in config, running  in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain)
...

Kafka(Broker)を起動する

続いて、Kafkaの起動です。

$ bin/kafka-server-start.sh config/server.properties
[2015-02-05 14:50:55,521] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2015-02-05 14:50:55,560] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
...

topicの作成

続いて、topicを作成します。topicは、Kafkaにおけるメッセージのカテゴリ名のようなものです。メッセージが入る箱みたいなものですね。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-sample
Created topic "kafka-sample".

topicが出来たようです。ついでにtopicを確認してみます。

$ bin/kafka-topics.sh --list --zookeeper localhost:2181
kafka-sample

確かにkafka-sampleが作られています。

データを投入する

同梱されているスクリプトを利用してデータを投入します。ちなみに、Kafkaではデータを投入する役割をProducer、メッセージを受け取る(一時的に保持する)役割がBroker、メッセージを取り出す役割をConsumerと言います。慣れるまではワケワカメですね。最初に立ち上げたKafkaはBrokerに相当します。

同梱しているスクリプトは標準入力を受け取り、Brokerにデータを渡す機能を提供しています。

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-sample
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test
message  

という感じです。これでBrokerにメッセージが渡された状態になります。

データを取り出す

Consumer経由でデータを取り出します。こちらも同梱のスクリプトを利用します。

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-sample --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test
message

という感じで結果が返ってきます。確かにproducer側で投入したデータが返ってきますね。双方のスクリプトを動かした状態でProducer側で入力した結果は、すぐにConsumer側で表示されます。

AMP Campをひと通りさらってみる:第6回 RDD

Exerciseにはないですが、Spark Programming GuideからRDDを中心に。

RDDのメモ

Resilient Distributed Datasets(RDDs)

  • RDDを作るためには2つの方法がある
    • 既存のコレクションをdriverプログラム内で並列化する
    • 外部ストレージからデータセットを参照する

Parallerized Collection

  • SparkContextのparallelizeメソッドをコレクションに対して呼び出す
    • コレクションの要素は並列処理可能な分散データへと形を替えるためにコピーされる
  • 重要なパラメータはデータを分けるpartitionの数である
    • Sparkでは各々のpartitionに対してクラスタの一つのタスクが実行される
    • 典型的にはクラスタの各CPUに対して2-4のpartitionを臨むであろう
    • Sparkは自動的にpartitionを決めてくれるがparallelizeメソッドの第二引数で明示的に与えることもできる

External Dataset

  • ストレージにあるデータソースから分散データセットを作ることもできる
    • ストレージとしては幅広く対応している
    • ファイルフォーマットとしてはHadoopInputFormat
    • 例えば、テキストファイル読み込みはSparkContextのtextfileメソッドでデータソースのURIを記述する
  • ファイルを読む場合の注意点
    • ローカルのファイルパスを読む場合、全てのworkerノードが同一のパスで参照できなければならない。コピーするか共有ファイルシステムを利用する
    • ディレクトリや圧縮ファイル等にも対応

RDD Operations

  • RDDは2つのオペレーションをサポートしている
    • transform:既存のデータセットから新しいデータセットを作成する
      • 例:mapは各要素に関数を適用して新しいデータセットを作成する
    • action:データセットへの処理を実行した後にdriver programに値を返す
      • 例:reduceは全てのRDDの要素に関数を適用して集約し、結果をdriver programに返す
  • transformは遅延評価される
    • ベースとなるデータセット(例えばfile)を記憶している
    • actionが結果を要求するとき(実行されるとき)transformは実行される
  • デフォルトではactionが実行される時、毎回transformが実行される
    • persist(あるいはcache)を利用することでメモリ上に結果を保存しておくことができる
    • メモリだけでなくディスクや、複数ノードへのレプリケーションもサポートしている
val lines = sc.textFile("data.txt") // 1
val lineLengths = lines.map(s => s.length) // 2
val totalLength = lineLengths.reduce((a, b) => a + b) // 3
lineLengths.persist() // 4
  • 1は外部ファイルからRDDを定義している
    • 実際の読み込みが行われるわけではない
    • linesは単なるファイルへのポインタ
  • 2がtransform。各行の長さを求めている
    • 遅延評価なのでこのタイミングでは実行されない
  • 3がaction。各行の長さを足しあわせている。
    • このタイミングでSparkは処理をタスクに分割している
    • 分割された処理ごとに各マシンでmapとlocalでのreduce処理を行う
  • 4がpersist。
    • lineLengthの再計算をしたくない場合はこうすることで保存される

Passing Function to Spark

  • SparkのAPIクラスタ上で動作せるためのdriver program内の関数を渡すことを強く信頼していて(必要としてる?)、推奨される方法は2つ
    • 無名関数を渡す
    • グローバルなシングルトンオブジェクトの静的メソッドとして渡す
  • クラスインスタンスメソッドへの参照も渡すことができるが、メソッドを含んだクラスオブジェクト全体を渡す必要がある
    • つまり通常のクラス定義でメソッドを書いた場合、内部的にインスタンスを参照するコードになることが多いため、メソッドのみクラスタにばらまくことができないという話かな、と。
    • 避けるためにメソッド内のローカル変数に一度インスタンスのメンバ値をコピーしてから関数を実行させましょう、的な話ですかね

Working With Key-Value pair

  • key-valueのペアになっているRDDだけ使える特別な処理がある
    • reduceByKeyとか
    • key毎に何か処理をするパターン

RDD Persistent

  • Sparkの最も重要な特徴の一つ
    • データセットをメモリに保存して各処理で共通して利用できる
  • persist()かcache()メソッドを使う
  • persistには異なるstrage levelを選ぶことができる。
    • ディスクへの保存
    • メモリに保存するうがJavaのオブジェクトとしてシリアライズされた状態で保存する
    • 各ノードにレプリケーションする
    • Tachyon上のオフヒープ領域に保存する
  • persist()を呼ばなくても、reduceByKeyなどのshuffleが発生するオペレーションでは中間データを自動的に保存する

Which Strage level choose?

  • メモリの使用量とCPU効率のトレードオフ
    • MOMORY_ONLYで問題ないサイズのRDDならそのままで良い。CPU効率が最も良いオプション
    • MEMORY_ONLYが辛ければMEMORY_ONLY_SER(RDDシリアライズされたJavaのオブジェクトとして保持する。デシリアライズ分効率は落ちるがメモリ空間を上手く使える)
    • レプリケーションをおおなうstrage levelは障害復旧を早くしたい場合に再計算がなくなってよい

Removing Data

  • キャッシュ使用状況をLRUで監視して適当に削除する
    • 手動で削除したい場合はunpersist()メソッドを呼ぶ

Shared Valiable

  • Sparkのoperationがリモートの各クラスターノードで実行される時、関数で使われる全てのデータの部分コピーに対してどうさせている。
    • 値は各マシンへコピーされ、リモートマシン上でアップデートは行われずにdriver programに戻される

Broadcast Variables

  • タスクとともにコピーを送るのではなく、各マシン上にread-onlyの値を保持することを許す
    • 効率的な方法で大きな入力データのコピーを各ノードに与えることを可能にする
    • SparkContextのbroadcastメソッドでbroadcast variableを作ることができる。valueメソッドで元の値の取り出し
  • Broadcast variableを作成した後、一回以上の呼び出しは行われない

Accumulators

  • addされるのみのvariable
    • 並列化が効率よくサポートされている
    • Sparkは数値型のaccumulatorをサポートしている(新しい型の追加も可能)

AMP Campをひと通りさらってみる:第5回 Data Exploration Using SparkSQL

第5回はSparkSQLです。

Data Exploration Using SparkSQLのメモ

  • SparkContextオブジェクトをラップしたSQLContextを作成する
  • 例ではParquetフォーマットのデータを取り扱う
    • Parquetフォーマット化されたWikipediaのデータからberkeleyという文字列を取り出す
  • Parquestファイルの読み込み結果の戻り値はSchemaRDD
    • SchemaRDDは通常のRDDと同じ関数を全て持っている
    • カラムの名前や型などの情報を保持しているため、テーブル登録を行った後にSQLクエリを投げることができるようになる
  • SQLの結果は常にRowオブジェクトのArray
    • Rowオブジェクトを通じて、各カラムにアクセスできる

SchemaRDDを作ってしまえばSQLが使えるので非常にETL処理が簡単になりますね。結果をまたRDD化して他に処理に渡していけばいいのかな、と。

動画のメモ

  • 第2回のものと同じ

AMP Campをひと通りさらってみる:第4回 Movie Recommendation with MLlib

これ順番が動画順なので、厳密にはAMP CampのWebページのメニュー順番とは異なっていますね。。。第4回目はMLlibで一応自分の中では本丸です。

Movie Recommendation with MLlibのメモ

  • MovieLensのデータを使ったMLlibのサンプル

Data set

Collaborative filtering

  • 協調フィルタリングの説明
    • 色々あるけど、好みにまつわる隠れた要因を見つけ出す手法の一つであるALS(Alternating Least Squares)をMLlibは実装しているのでそれを利用する

Create training examples

  • 自分の評価結果を作成するPythonスクリプトが用意されていてそれを利用する
    • 推薦に使われる

Setup

  • まずはじめにSparkConfを作成
  • ratingを読み込むためにSparkContextを作成
  • ratingを(Int, Ratingオブジェクト)のペアに分解
    • timestampの数値をランダムなキーとして保持
    • Ratingオブジェクトはタプルのラッパー
  • ratingと同じようにmovieのidとタイトルを取得している
    • 読み込んで、splitしたものをcollectして集めて、map化している。

Running the program

  • ratingのレコード数、ユニークユーザー数などの項目を取得するコードを追加
  • sbtでビルドして、spark-submitで動かす

自分のパスだと以下の様な感じで

$ cd machine-learning/scala
$ ../../spark/bin/spark-submit --class MovieLensALS target/scala-2.10/movielens-als-assembly-0.1.jar ../../data/movielens/medium ../personalRatings.txt

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/01/03 00:39:02 INFO Slf4jLogger: Slf4jLogger started
15/01/03 00:39:02 INFO Remoting: Starting remoting
15/01/03 00:39:03 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.0.6:56004]
15/01/03 00:39:03 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@192.168.0.6:56004]
2015-01-03 00:39:03.976 java[32045:1903] Unable to load realm info from SCDynamicStore
15/01/03 00:39:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/01/03 00:39:04 WARN LoadSnappy: Snappy native library not loaded
15/01/03 00:39:04 INFO FileInputFormat: Total input paths to process : 1
15/01/03 00:39:05 INFO FileInputFormat: Total input paths to process : 1
Got 1000209 ratings from 6040users on 3706 movies.

という感じで準備オッケー

Splitting training data

  • データをtraining, test, validationに分ける
    • そのためにtimestampを使ったランダムなキーを残しておいた

Training using ALS

  • rankとlambdaというパラメータが重要
    • だけど時間の都合上8パターン(rank2パターン、lambda2パターン、繰り返し回数2パターンの組み合わせ23=8パターン)で試す
    • RMSEが最小のものを良いモデルとして選択

Recommending movies for you

  • 事前に作成した自分のレーティングを基にオススメの映画をレコメンド

Comparing to a naive baseline

  • ratingの平均をbaselineとしてどれだけ改善したか

Augmenting matrix factors

  • もし興味をもったらMatrixFactorizationModelの実装を見てみましょうね

ALSの参考

レコメンドも前職の途中までは真面目に勉強していましたが約2年くらいブランクがあるので色々おさらいが必要そうです。。。

動画のメモ


AMP Camp 5: MLlib - Ameet Talwalkar - YouTube

  • MLlibの紹介
    • MLlib自体はともかく、MLbase/MLpipelie/MLoptというもう少し大きな話があった
  • K-meansの話
  • 映画のレコメンドを例に挙げた協調フィルタリングの話
    • 欠損値がある
    • レーティングはより小さな要因から決定されていると仮定
      • user/movie factorに分解
    • ALSはユーザーはuser, movieの双方をアップデートしていく
      • 並列にアップデートできる
    • ALSを使ったレコメンドがハンズオンの内容
  • パフォーマンス(AWS上の16nodeの話)
    • Mahoutよりも速い(もはやこういったベンチには段々意味がないと思うけど。。。)
    • スケーラビリティもMahoutよりもよい
    • 50node, 660M users, 2.4M items, 3.5B ratingsが40minuiteで完了
  • 1.0から1.1になってパフォーマンスが大幅アップ
  • ML pipelineの対応
    • 1.2からの対応
    • workflowの記述が簡単に
    • モデルのチューニングに関する標準的なインターフェースを備える
  • ML Dataset = SchemaRDDという感じでSparkSQLと統合を図る

なんとなくScalaのコードも見慣れつつあるので、ソースコードも読める気はしてきています。MLlibはもっと深堀りしていきたいですね。