Hadoop streamingを利用してRでMapReduce
色々と調べてみた
良くまとまっていて面白かったです。
Rの並列化の現状について : wrong, rogue and log
RHIPE(R and Hadoop Integrated Processing Environment)
RHIPE: R and Hadoop Integrated Processing Environment
Hadoop streamingを使ってRでMapReduce
R言語で MapReduce −Hadoop Streaming− - hamadakoichi blog
https://www.rmetrics.org/files/Meielisalp2009/Presentations/Theussl1.pdf
というわけで、今回はどんぴしゃのHadoop streamingを試してみることにします。
Hadoop streamingは簡単に言うと標準入出力を利用して、
入力 => Rによるmap => 中間結果 => Rによるreduce => 出力
をHadoop上で行うというものです。
とりあえずスタンドアロンモードで試します。
試してみる
R言語で MapReduce −Hadoop Streaming− - hamadakoichi blogを元に試してみます。
ほぼ参考サイトそのままです。
mapper.r
#!/usr/bin/Rscript con = file(description="stdin",open="r") while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) { line <- unlist(strsplit(line, "\t")) for(word in line){ cat(sprintf("%s\t%s\n", word, 1), sep = "") } } close(con)
reducer.r
#!/usr/bin/Rscript env <- new.env(hash = TRUE) con <- file("stdin", open = "r") while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) { line <- unlist(strsplit(line, "\t")) key <- line[1] value <- as.integer(line[2]) if (exists(key, envir = env, inherits = FALSE)) { count <- get(key, envir = env) assign(key, count + value, envir = env) } else { assign(key, value, envir = env) } } close(con) for (key in ls(env, all = TRUE)) { cat(key, "\t", get(key, envir = env), "\n", sep = " ") }
分析用のファイル
test.data
a b c d e f g g d c a a a d d
上記ファイルを適当に${HADOOP_HOME}/Rとかディレクトリを作っていれておく
$ cd ${HADOOP_HOME} $ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -files mapper.r, reducer.r -input R/test.data -output R/output -mapper mapper.r -reducer reducer.r 11/04/24 00:32:22 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 11/04/24 00:32:22 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 11/04/24 00:32:22 INFO mapred.FileInputFormat: Total input paths to process : 1 11/04/24 00:32:22 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local] 11/04/24 00:32:22 INFO streaming.StreamJob: Running job: job_local_0001 11/04/24 00:32:22 INFO streaming.StreamJob: Job running in-process (local Hadoop) 11/04/24 00:32:22 INFO mapred.FileInputFormat: Total input paths to process : 1 11/04/24 00:32:23 INFO mapred.MapTask: numReduceTasks: 1 11/04/24 00:32:23 INFO mapred.MapTask: io.sort.mb = 100 11/04/24 00:32:23 INFO mapred.MapTask: data buffer = 79691776/99614720 11/04/24 00:32:23 INFO mapred.MapTask: record buffer = 262144/327680 11/04/24 00:32:23 INFO streaming.PipeMapRed: PipeMapRed exec [/usr/local/hadoop/./R/mapper.r] 11/04/24 00:32:23 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s] 11/04/24 00:32:23 INFO streaming.StreamJob: map 0% reduce 0% 11/04/24 00:32:23 INFO streaming.PipeMapRed: Records R/W=4/1 11/04/24 00:32:24 INFO streaming.PipeMapRed: MROutputThread done 11/04/24 00:32:24 INFO streaming.PipeMapRed: MRErrorThread done 11/04/24 00:32:24 INFO streaming.PipeMapRed: mapRedFinished 11/04/24 00:32:24 INFO mapred.MapTask: Starting flush of map output 11/04/24 00:32:24 INFO mapred.MapTask: Finished spill 0 11/04/24 00:32:24 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 11/04/24 00:32:24 INFO mapred.LocalJobRunner: Records R/W=4/1 11/04/24 00:32:24 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done. 11/04/24 00:32:24 INFO mapred.LocalJobRunner: 11/04/24 00:32:24 INFO mapred.Merger: Merging 1 sorted segments 11/04/24 00:32:24 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 92 bytes 11/04/24 00:32:24 INFO mapred.LocalJobRunner: 11/04/24 00:32:24 INFO streaming.PipeMapRed: PipeMapRed exec [/usr/local/hadoop/./R/reducer.r] 11/04/24 00:32:24 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s] 11/04/24 00:32:24 INFO streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s] 11/04/24 00:32:24 INFO streaming.PipeMapRed: MRErrorThread done 11/04/24 00:32:24 INFO streaming.PipeMapRed: Records R/W=15/1 11/04/24 00:32:24 INFO streaming.PipeMapRed: MROutputThread done 11/04/24 00:32:24 INFO streaming.PipeMapRed: mapRedFinished 11/04/24 00:32:24 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 11/04/24 00:32:24 INFO mapred.LocalJobRunner: 11/04/24 00:32:24 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 11/04/24 00:32:24 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/usr/local/hadoop/R/output 11/04/24 00:32:24 INFO mapred.LocalJobRunner: Records R/W=15/1 > reduce 11/04/24 00:32:24 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done. 11/04/24 00:32:24 INFO streaming.StreamJob: map 100% reduce 100% 11/04/24 00:32:24 INFO streaming.StreamJob: Job complete: job_local_0001 11/04/24 00:32:24 INFO streaming.StreamJob: Output: R/output
おおお
動きました!
${HADOOP_HOME}/R/outputにはpart-00000ができています。
a 4 b 1 c 2 d 4 e 1 f 1 g 2
と集計されていますね。
ともあれ、何をしているかきちんと理解しないと積み上げにはなりません。
Rのあれこれも忘れているので一つずつ見て行きます。
やっていること
mapper.rでは、入力ファイルを読みこんで、行毎に\tで分割。
{文字:1}というkey:valueの形式を標準出力へ
reducer.rでは、環境envを用意(この辺りよく分かってません)。
Rでハッシュを使うための工夫のような気がしています。
標準入力から、mapの結果の{key:value}を受け取ってパース。
envにワードのハッシュがあればカウントを+1する。
なければ新規に1を登録。
というかたちで、ワードのカウント結果をenvに記録していく。
という流れです。
最終的には"単語"\t"カウント数"\nという形で標準出力へ
Hadoopに添付されているサンプルコードの
${HADOOP_HOME}/src/examples/org/apache/hadoop/examples/WordCount.java
を見ると絶望的な気持ちになりますが、Hadoop streamingで書くと非常にすっきりしますね。
ハッシュをもっとすっきりと書ける言語だとさらに見通しは良くなりそうです。
MapとReduceを慣れ親しんだ言語で書けるのは非常に楽ですね。
Hadoop | |
Tom White 玉川 竜司 オライリージャパン 2010-01-25 売り上げランキング : 39100 Amazonで詳しく見る by G-Tools |
以下、Rの関数を色々と調べた過程の残骸
せっかくなので残しておきます。
con = file(description="stdin",open="r")
Functions to create, open and close connections.
descriptioncharacter string. A description of the connection: see ‘Details’. Use "stdin" to refer to the C-level ‘standard input’ of the process (which need not be connected to anything in a console or embedded version of R). See also stdin() for the subtly different R-level concept ofstdin.標準入力をオープン
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0)
Read some or all text lines from a connection.
cona connection object or a character string.ninteger. The (maximal) number of lines to read. Negative values indicate that one should read up to the end of input on the connection.warnlogical. Warn if a text file is missing a final EOL. 標準入力から一行ずつ読んで、入力がなくなるまでループ
env <- new.env(hash = TRUE)
Get, set, test for and create environments.
hasha logical, if TRUE the environment will use a hash table. new.env returns a new (empty) environment with (by default) enclosure the parent frame.ハッシュテーブルを使って環境を作成?
環境がちょっとわかんない><
if (exists(key, envir = env, inherits = FALSE))
Look for an R object of the given name.
xa variable name (given as a character string). enviran alternative way to specify an environment to look in, but it is usually simpler to just use the where argument.
inheritshould the enclosing frames of the environment be searched? keyがenvに存在しているかを調べる
assign(key, count + value, envir = env)
Assign a value to a name in an environment.
xa variable name, given as a character string. No coercion is done, and the first element of a character vector of length greater than one will be used, with a warning.valuea value to be assigned to x. envirthe environment to use. See the details section.
envにあるkeyという変数にcout+valueを割り当てる
for (key in ls(env, all = TRUE))
ls and objects return a vector of character strings giving the names of the objects in the specified environment. When invoked with no argument at the top level prompt, ls shows what data sets and functions a user has defined. When invoked with no argument inside a function, ls returns the names of the functions local variables. This is useful in conjunction with browser.
namewhich environment to use in listing the available objects. Defaults to the current environment. Although called name for back compatibility, in fact this argument can specify the environment in any form; see the details section. enviran alternative argument to name for specifying the environment. Mostly there for back compatibility.all.namesa logical value. If TRUE, all object names are returned. If FALSE, names which begin with a . are omitted.
env内の全ての名前を返して、keyに渡してループ
cat(key, "\t", get(key, envir = env), "\n", sep = " ")
Outputs the objects, concatenating the representations. cat performs much less conversion than print.
Search for an R object with a given name and return it....R objects (see ‘Details’ for the types of objects allowed).
Search for an R object with a given name and return it.
key \t (envのkeyに割り当てられられている値) \nで出力