Strata + Hadoop World参加記録 その4
今日は自分的に楽しそうなセッションが盛りだくさんですね。メモはけっこう適当です。
Big Data at Netflix: Faster and Easier
- バックエンドにHDFSは使っておらずS3を利用している
- 400 Billion Events / Day
- ビデオの再生とかUIに対するアクションとか
- Data Platform(High level)
- Why Presto (vs. Alternatives)
- Spark
- 良い所
- 2013年から使ってるけど当時はイマイチだった
- 限定的な状態ではあるがproductionで使われている
- 関連プロダクトがぎゅっと集まっている
- パフォーマンス(Sparkは100倍といわれているがそこまでではない)
- コミュニティ
- 未来がありそう
- 悪い所
- まだ歴史が浅い
- マルチテナンシーがない
- shuffling
- ハイレベルなAPIのためチューニングがやや困難
- その他あれこれお
- 良い所
- Streaming Processing
- 一貫性と可用性
- LatencyとScaling
- Storm, Samza, Sparkとあるが何を選ぶべきか。。。
- Parquet
- カラムナ型
- Pig on TEZ
- Kafka
- suro
- druid
- genie
- inviz
- lipstick
- metacat
- Bigdataportl
Kragle
Netflixのculture
全てはカルチャーのなせる技な気がしました。
Tuning and Debugging in Apache Spark
席がうまく確保出来ず、メモを取る余裕もありませんでした。パーティションが大事、はとりあえず覚えているのでまた復習的な何かが必要ですね。あとは、やっぱりshuffleがきついのでここをいかに回避するかが一つのポイント、みたいな感じでしょうか?shuffleについては、この日の午後に一つセッションを設けて話されていました。
YARN vs. MESOS: Can’t We All Just Get Along?
MesosとYarnをmanageするMyriadの話でした。まだまだこれからのプロジェクトだと思いますが、MapRが主導していくとのことなので、個人的にはDrillのように急激に伸びてくるんだろうなぁと思っています。
Everyday I’m Shuffling - Tips for Writing Better Spark Programs
- shuffleの話:非効率化の原因
- driver or workerのどちらでデータを動かすか:エラーの原因
- Reduce By Key vs GroupByKeyの比較
- 結果は同じ
- reduceByKeyの方が効率的
- shuffleの問題から起きる
- shuffle前に同じキーで処理してからshuffleするのがreduceByKeyだから
- 大きなテーブルと小さなテーブルの結合(SpqrkSQL)
- shuffledHashJoin
- BroadcastHashJoin
- 小さな方のデータをBroadCastする
- How to Configure BroadcastHashJoin
- 1.2から設定できる。
- Set spark.sql.autoBroadcastJoinThreshold
- Join a Medium Table with a Huge Table
- Left Join
- ...
- Left Join
- shuffleの問題を見つける
- main programはDriverで実行
- transformはworkerで実行
- collect()を大きなRDDで呼ぶとOOMKillerで死ぬ
と、出来る限りメモリましたが、不完全なことこの上なく。。。同行者の方から、この手の話はgithub上にも公開されてるよー、と教えていただいたので合わせてshareです。
その他
今日はDatabricksとMapRのTシャツをゲット!Tシャツが累計10枚くらいになりました。。。
というわけで、長いようで短かったようなStrata出張終わりです。色々消化しきれていない部分を早く消化しないとなー、という感じです。余談ですが、Strataへの参加層として、中国人、インド人はすごく多そうでしたが、やはり日本からはほとんど参加されていないんですね。
Strata + Hadoop World参加記録 その3
今日からはセッションに参加できます。なにげに参加していなかったのですが、キーノートセッションにオバマのビデオが流れたとのこと!
President Barack Obama's Big Data Keynote ...
国のトップがこういうメッセージを出すってすごいインパクトですよね。。。
その他セッションを色々回りながら見ていました。
Spark Streamingの話
Spark Streaming - The State of the Union, and Beyond
にて、PMCのTathagata Das の話を聞くなどしてきました。
SparkSQLやSparkCore、MLlibとの連携の話や、Spark1.3ではStreaming LDAなどが実装される話、MLlibとの連携強化等が話題にのぼっており、大変興味深い感じでした。
NetflixがStreaming処理系をSparkStreamingにリプレースしている話話や、Pinterestでも使われているよー、みたいな話が共有され、Spark Streamingへの高まりを感じる次第です。
自分の興味がある部分だからかわかりませんが、Streaming関連の情報がけっこう多かったかな、と。
本日の戦利品はStrataの公式Tシャツ、AerospikeのTシャツ、BashoのTシャツです。Bashoはステッカーももらいましたし、ロゴがかわいいですね。
明日は楽しみなセッションが多めなのでさらに期待が高まります。
Strata + Hadoop World参加記録 その2
2daysのチケットしか持っていませんので、本日はexpo hallをうろうろしました。 けっこう広い!あと、日本のイベントと違って会場内でアルコールが飲める!(その分参加費が異常に高い!!)
英語は拙いながら、我らが(?)TreasureDataさんのブースもあったりして中々楽しく回らせてもらいました。
個人的にimpressiveだった事
「Strata + Hadoop conference」だけどほとんどSpark一色だった
時代の移り変わりなのでしょうか? みんなSparkとかストリーミングとか言ってる感じです。
インメモリでガンガン行こうぜ的な
インメモリでガンガン行こうぜ!という感じのモノがけっこうあった気がします。 その最たるものがmemsqlかな、と。VoltDBとの違い等はまだ整理しきれていませんが、30億レコードをささっとgroup byするデモなどを見ると、なんかそうですよねー、というものを感じます。ちなみに、マネジメントコンソール的なものでその時のマシンを見せてもらいましたが、そこまでのスペックでもなかったのが印象的です。
Tシャツやらノベルティやら
ミーハーごころ丸出しで色々いただきました〜
- memsql, TreasureData, Cloudera, MicrosoftのTシャツなどをゲットしました!
後は、DataBricksのCEOのIon Stoicaさんやら Doug Cuttingさんと挨拶させて頂くなどしました。HadoopとSparkのお父さん達に多謝。
Strata + Hadoop World参加記録 その1
色々ありまして、出張にてStrataに参加できることになり、前日にあるmeetupに参加してきました。会社に深い感謝の念を抱きつつ、記録的な何かを残していきます。Strata自体はまだ始まっていないけど同会場の一部でのmeetupだったため、前夜祭って感じが出ていました。
事前に発表予定だったClouderaの人等が参加できなくなったり、食べ物提供予定がなくなったりということで、400人程度参加表明をしつつ実際は50人もいないくらい?な感じでした。
内容としては、Spark1.3に入ってくるDataFramesの紹介でしたが、これまじでスゴイですね。 ほぼ同内容がブログでも公開されています。
- PandasやRのdataframeにインスパイアされたインターフェースで敷居が低い
- (DataSourceAPIとの関連だと思うけど)多様なデータソースに対応
- pipelineAPIとのコンボ
- Sparkそのものが遅延評価で動くことを活かして、裏側ではSparkSQLで利用するCatalystを利用するのでパフォーマンスが良い
ということで、今のところは良いことずくしに見えます。
※DataBricksが公開している検証結果(ブログ記事より抜粋)
という感じでかなり広く受け入れられそうな機能がどんどん増えていて大変素晴らしいという感じを受けています。
とりあえず触ってみないとですね。Spark進化が早くて追いかけるだけでもすごく大変です。せっかく現地にいるので速報っぽいのをやってみたかったので書いてみました。
Sparkの2015年の展望
こんな動画がありました。
What's coming for Spark in 2015 - Bay Area Spark ...
ざっくり言うとSparkの2015年はSparkSQL(SchemaRDD)と機械学習部分に力を入れていきそうな感じでしょうか。
ざっくり動画を見た感じだと
- SaprkSQLが使い物になるのであれば、色々と使い勝手の幅が広がりそう
- Datasource APIが色々拡充しそうで良い
- ML Pipelineも非常に興味深い
- MLlibにもstreamingに対応したものがポツポツ出始めている。例えばStreaming K-meansとか
何となくSchemaRDDを中心とした世界観が見えてきた感じがして引き続き目が話せないなぁ、という感じです。
Kafkaをとりあえず動かす
最近TL等ではKafkaという単語をよく目にするようになってきましたが、kafkaとググっても日本語の情報がほとんどないのが悲しいですね。というわけで本家ドキュメントのQuick Start的なモノを動かしてみた話です。
本当に動かすだけです。が、以外にこのレベルの情報もない!のがKafkaですね。もっとあっても良さそうなのに。。。
Kafkaってなんなのよ、って話は今は触れずにとりあえずKafkaをローカル環境にて動かしてみます。
公式ドキュメントのQuick Startに従います。
とりあえず動かす
Kafkaを取ってくる
ミラーサイトからKafkaを取ってきます。最新版は8.2.0のようですが、公式ドキュメントは8.1.1なので、8.1.1を取ってきます。 適当に展開しましょう。
$ wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz $ tar xzvf kafka_2.9.2-0.8.1.1.tgz
ZooKeeperを起動する
KafkaはZooKeeperが必須のプロダクトとなっていますので、Kafkaを立ち上げる前に、ZooKeeperも起動しておきます。
先ほど取ってきたKafkaのディレクトリにZooKeeperも同梱されているのでテスト的に利用する際はそちらを利用するとよいでしょう。
bin/
ディレクトリに各種操作実行用のスクリプト、config/
ディレクトリ以下にKafkaやZooKeeperの設定ファイル一式が入っています。
まずはZooKeeprの起動です。
$ cd kafka_2.9.2-0.8.1.1 $ bin/zookeeper-server-start.sh config/zookeeper.properties [2015-02-05 14:48:29,847] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2015-02-05 14:48:29,848] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) ...
Kafka(Broker)を起動する
続いて、Kafkaの起動です。
$ bin/kafka-server-start.sh config/server.properties [2015-02-05 14:50:55,521] INFO Verifying properties (kafka.utils.VerifiableProperties) [2015-02-05 14:50:55,560] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties) ...
topicの作成
続いて、topicを作成します。topicは、Kafkaにおけるメッセージのカテゴリ名のようなものです。メッセージが入る箱みたいなものですね。
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-sample Created topic "kafka-sample".
topicが出来たようです。ついでにtopicを確認してみます。
$ bin/kafka-topics.sh --list --zookeeper localhost:2181 kafka-sample
確かにkafka-sampleが作られています。
データを投入する
同梱されているスクリプトを利用してデータを投入します。ちなみに、Kafkaではデータを投入する役割をProducer、メッセージを受け取る(一時的に保持する)役割がBroker、メッセージを取り出す役割をConsumerと言います。慣れるまではワケワカメですね。最初に立ち上げたKafkaはBrokerに相当します。
同梱しているスクリプトは標準入力を受け取り、Brokerにデータを渡す機能を提供しています。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-sample SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. test message
という感じです。これでBrokerにメッセージが渡された状態になります。
データを取り出す
Consumer経由でデータを取り出します。こちらも同梱のスクリプトを利用します。
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-sample --from-beginning SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. test message
という感じで結果が返ってきます。確かにproducer側で投入したデータが返ってきますね。双方のスクリプトを動かした状態でProducer側で入力した結果は、すぐにConsumer側で表示されます。
AMP Campをひと通りさらってみる:第6回 RDD
- 第1回 IntroductionとGetting Started
- 第2回 Data Exploration Using Spark
- 第3回 Explore In-Memory Data Store Tachyon
- 第4回 Movie Recommendation with MLlib
- 第5回 Data Exploration Using SparkSQL
Exerciseにはないですが、Spark Programming GuideからRDDを中心に。
RDDのメモ
Resilient Distributed Datasets(RDDs)
Parallerized Collection
- SparkContextのparallelizeメソッドをコレクションに対して呼び出す
- コレクションの要素は並列処理可能な分散データへと形を替えるためにコピーされる
- 重要なパラメータはデータを分けるpartitionの数である
External Dataset
- ストレージにあるデータソースから分散データセットを作ることもできる
- ストレージとしては幅広く対応している
- ファイルフォーマットとしてはHadoopのInputFormat
- 例えば、テキストファイル読み込みはSparkContextのtextfileメソッドでデータソースのURIを記述する
- ファイルを読む場合の注意点
- ローカルのファイルパスを読む場合、全てのworkerノードが同一のパスで参照できなければならない。コピーするか共有ファイルシステムを利用する
- ディレクトリや圧縮ファイル等にも対応
RDD Operations
- RDDは2つのオペレーションをサポートしている
- transformは遅延評価される
- ベースとなるデータセット(例えばfile)を記憶している
- actionが結果を要求するとき(実行されるとき)transformは実行される
- デフォルトではactionが実行される時、毎回transformが実行される
val lines = sc.textFile("data.txt") // 1 val lineLengths = lines.map(s => s.length) // 2 val totalLength = lineLengths.reduce((a, b) => a + b) // 3 lineLengths.persist() // 4
- 1は外部ファイルからRDDを定義している
- 実際の読み込みが行われるわけではない
- linesは単なるファイルへのポインタ
- 2がtransform。各行の長さを求めている
- 遅延評価なのでこのタイミングでは実行されない
- 3がaction。各行の長さを足しあわせている。
- このタイミングでSparkは処理をタスクに分割している
- 分割された処理ごとに各マシンでmapとlocalでのreduce処理を行う
- 4がpersist。
- lineLengthの再計算をしたくない場合はこうすることで保存される
Passing Function to Spark
- SparkのAPIはクラスタ上で動作せるためのdriver program内の関数を渡すことを強く信頼していて(必要としてる?)、推奨される方法は2つ
- 無名関数を渡す
- グローバルなシングルトンオブジェクトの静的メソッドとして渡す
- クラスインスタンスのメソッドへの参照も渡すことができるが、メソッドを含んだクラスオブジェクト全体を渡す必要がある
Working With Key-Value pair
- key-valueのペアになっているRDDだけ使える特別な処理がある
- reduceByKeyとか
- key毎に何か処理をするパターン
RDD Persistent
- Sparkの最も重要な特徴の一つ
- データセットをメモリに保存して各処理で共通して利用できる
- persist()かcache()メソッドを使う
- persistには異なるstrage levelを選ぶことができる。
- persist()を呼ばなくても、reduceByKeyなどのshuffleが発生するオペレーションでは中間データを自動的に保存する
Which Strage level choose?
- メモリの使用量とCPU効率のトレードオフ
Removing Data
- キャッシュ使用状況をLRUで監視して適当に削除する
- 手動で削除したい場合はunpersist()メソッドを呼ぶ
Shared Valiable
- Sparkのoperationがリモートの各クラスターノードで実行される時、関数で使われる全てのデータの部分コピーに対してどうさせている。
- 値は各マシンへコピーされ、リモートマシン上でアップデートは行われずにdriver programに戻される
Broadcast Variables
- タスクとともにコピーを送るのではなく、各マシン上にread-onlyの値を保持することを許す
- Broadcast variableを作成した後、一回以上の呼び出しは行われない
Accumulators
- addされるのみのvariable
- 並列化が効率よくサポートされている
- Sparkは数値型のaccumulatorをサポートしている(新しい型の追加も可能)