Hadoop streamingを利用してRでMapReduce

動機

MapReduceJava以外の言語で使えると楽なので、調べて試してみました。

色々と調べてみた

良くまとまっていて面白かったです。
Rの並列化の現状について : wrong, rogue and log


RHIPE(R and Hadoop Integrated Processing Environment)
RHIPE: R and Hadoop Integrated Processing Environment


Hadoop streamingを使ってRでMapReduce
R言語で MapReduce −Hadoop Streaming− - hamadakoichi blog
https://www.rmetrics.org/files/Meielisalp2009/Presentations/Theussl1.pdf


というわけで、今回はどんぴしゃのHadoop streamingを試してみることにします。
Hadoop streamingは簡単に言うと標準入出力を利用して、
入力 => Rによるmap => 中間結果 => Rによるreduce => 出力
Hadoop上で行うというものです。
とりあえずスタンドアロンモードで試します。

試してみる

R言語で MapReduce −Hadoop Streaming− - hamadakoichi blogを元に試してみます。
ほぼ参考サイトそのままです。

mapper.r

#!/usr/bin/Rscript

con = file(description="stdin",open="r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
        line <- unlist(strsplit(line, "\t"))
        for(word in line){
                cat(sprintf("%s\t%s\n", word, 1), sep = "")
        }
}
close(con)

reducer.r

#!/usr/bin/Rscript

env <- new.env(hash = TRUE)
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
        line <- unlist(strsplit(line, "\t"))
        key <- line[1]
        value <- as.integer(line[2])
        if (exists(key, envir = env, inherits = FALSE)) {
                count <- get(key, envir = env)
                assign(key, count + value, envir = env)
        }
        else {
                assign(key, value, envir = env)
        }
}
close(con)

for (key in ls(env, all = TRUE)) {
        cat(key, "\t", get(key, envir = env), "\n", sep = " ")
}

分析用のファイル
test.data

a       b       c
d       e       f       g
g       d       c       a
a       a       d       d

上記ファイルを適当に${HADOOP_HOME}/Rとかディレクトリを作っていれておく

$ cd ${HADOOP_HOME}
$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -files mapper.r, reducer.r -input R/test.data -output R/output -mapper mapper.r -reducer reducer.r

11/04/24 00:32:22 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
11/04/24 00:32:22 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
11/04/24 00:32:22 INFO mapred.FileInputFormat: Total input paths to process : 1
11/04/24 00:32:22 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local]
11/04/24 00:32:22 INFO streaming.StreamJob: Running job: job_local_0001
11/04/24 00:32:22 INFO streaming.StreamJob: Job running in-process (local Hadoop)
11/04/24 00:32:22 INFO mapred.FileInputFormat: Total input paths to process : 1
11/04/24 00:32:23 INFO mapred.MapTask: numReduceTasks: 1
11/04/24 00:32:23 INFO mapred.MapTask: io.sort.mb = 100
11/04/24 00:32:23 INFO mapred.MapTask: data buffer = 79691776/99614720
11/04/24 00:32:23 INFO mapred.MapTask: record buffer = 262144/327680
11/04/24 00:32:23 INFO streaming.PipeMapRed: PipeMapRed exec [/usr/local/hadoop/./R/mapper.r]
11/04/24 00:32:23 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
11/04/24 00:32:23 INFO streaming.StreamJob:  map 0%  reduce 0%
11/04/24 00:32:23 INFO streaming.PipeMapRed: Records R/W=4/1
11/04/24 00:32:24 INFO streaming.PipeMapRed: MROutputThread done
11/04/24 00:32:24 INFO streaming.PipeMapRed: MRErrorThread done
11/04/24 00:32:24 INFO streaming.PipeMapRed: mapRedFinished
11/04/24 00:32:24 INFO mapred.MapTask: Starting flush of map output
11/04/24 00:32:24 INFO mapred.MapTask: Finished spill 0
11/04/24 00:32:24 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
11/04/24 00:32:24 INFO mapred.LocalJobRunner: Records R/W=4/1
11/04/24 00:32:24 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
11/04/24 00:32:24 INFO mapred.LocalJobRunner:
11/04/24 00:32:24 INFO mapred.Merger: Merging 1 sorted segments
11/04/24 00:32:24 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 92 bytes
11/04/24 00:32:24 INFO mapred.LocalJobRunner:
11/04/24 00:32:24 INFO streaming.PipeMapRed: PipeMapRed exec [/usr/local/hadoop/./R/reducer.r]
11/04/24 00:32:24 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
11/04/24 00:32:24 INFO streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
11/04/24 00:32:24 INFO streaming.PipeMapRed: MRErrorThread done
11/04/24 00:32:24 INFO streaming.PipeMapRed: Records R/W=15/1
11/04/24 00:32:24 INFO streaming.PipeMapRed: MROutputThread done
11/04/24 00:32:24 INFO streaming.PipeMapRed: mapRedFinished
11/04/24 00:32:24 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
11/04/24 00:32:24 INFO mapred.LocalJobRunner:
11/04/24 00:32:24 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
11/04/24 00:32:24 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/usr/local/hadoop/R/output
11/04/24 00:32:24 INFO mapred.LocalJobRunner: Records R/W=15/1 > reduce
11/04/24 00:32:24 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
11/04/24 00:32:24 INFO streaming.StreamJob:  map 100%  reduce 100%
11/04/24 00:32:24 INFO streaming.StreamJob: Job complete: job_local_0001
11/04/24 00:32:24 INFO streaming.StreamJob: Output: R/output

おおお
動きました!
${HADOOP_HOME}/R/outputにはpart-00000ができています。

a        4
b        1
c        2
d        4
e        1
f        1
g        2 

と集計されていますね。

ともあれ、何をしているかきちんと理解しないと積み上げにはなりません。
Rのあれこれも忘れているので一つずつ見て行きます。

やっていること

mapper.rでは、入力ファイルを読みこんで、行毎に\tで分割。
{文字:1}というkey:valueの形式を標準出力へ

reducer.rでは、環境envを用意(この辺りよく分かってません)。
Rでハッシュを使うための工夫のような気がしています。
標準入力から、mapの結果の{key:value}を受け取ってパース。
envにワードのハッシュがあればカウントを+1する。
なければ新規に1を登録。
というかたちで、ワードのカウント結果をenvに記録していく。
という流れです。

最終的には"単語"\t"カウント数"\nという形で標準出力へ

Hadoopに添付されているサンプルコードの
${HADOOP_HOME}/src/examples/org/apache/hadoop/examples/WordCount.java
を見ると絶望的な気持ちになりますが、Hadoop streamingで書くと非常にすっきりしますね。
ハッシュをもっとすっきりと書ける言語だとさらに見通しは良くなりそうです。

MapとReduceを慣れ親しんだ言語で書けるのは非常に楽ですね。


Hadoop
HadoopTom White 玉川 竜司

オライリージャパン 2010-01-25
売り上げランキング : 39100


Amazonで詳しく見る
by G-Tools

以下、Rの関数を色々と調べた過程の残骸

せっかくなので残しておきます。
con = file(description="stdin",open="r")
Functions to create, open and close connections.
descriptioncharacter string. A description of the connection: see ‘Details’. Use "stdin" to refer to the C-level ‘standard input’ of the process (which need not be connected to anything in a console or embedded version of R). See also stdin() for the subtly different R-level concept ofstdin.標準入力をオープン


while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0)
Read some or all text lines from a connection.
cona connection object or a character string.ninteger. The (maximal) number of lines to read. Negative values indicate that one should read up to the end of input on the connection.warnlogical. Warn if a text file is missing a final EOL. 標準入力から一行ずつ読んで、入力がなくなるまでループ

env <- new.env(hash = TRUE)
Get, set, test for and create environments.
hasha logical, if TRUE the environment will use a hash table. new.env returns a new (empty) environment with (by default) enclosure the parent frame.ハッシュテーブルを使って環境を作成?
環境がちょっとわかんない><

if (exists(key, envir = env, inherits = FALSE))
Look for an R object of the given name.
xa variable name (given as a character string). enviran alternative way to specify an environment to look in, but it is usually simpler to just use the where argument.
inheritshould the enclosing frames of the environment be searched? keyがenvに存在しているかを調べる

assign(key, count + value, envir = env)
Assign a value to a name in an environment.
xa variable name, given as a character string. No coercion is done, and the first element of a character vector of length greater than one will be used, with a warning.valuea value to be assigned to x. envirthe environment to use. See the details section.
envにあるkeyという変数にcout+valueを割り当てる


for (key in ls(env, all = TRUE))
ls and objects return a vector of character strings giving the names of the objects in the specified environment. When invoked with no argument at the top level prompt, ls shows what data sets and functions a user has defined. When invoked with no argument inside a function, ls returns the names of the functions local variables. This is useful in conjunction with browser.
namewhich environment to use in listing the available objects. Defaults to the current environment. Although called name for back compatibility, in fact this argument can specify the environment in any form; see the details section. enviran alternative argument to name for specifying the environment. Mostly there for back compatibility.all.namesa logical value. If TRUE, all object names are returned. If FALSE, names which begin with a . are omitted.
env内の全ての名前を返して、keyに渡してループ



cat(key, "\t", get(key, envir = env), "\n", sep = " ")

Outputs the objects, concatenating the representations. cat performs much less conversion than print.
Search for an R object with a given name and return it....R objects (see ‘Details’ for the types of objects allowed).
Search for an R object with a given name and return it.

key \t (envのkeyに割り当てられられている値) \nで出力