Hadoop streamingを利用してRでMapReduce




Hadoop streamingを使ってRでMapReduce
というわけで、今回はどんぴしゃのHadoop streamingを試してみることにします。
Hadoop streamingは簡単に言うと標準入出力を利用して、
入力 => Rによるmap => 中間結果 => Rによるreduce => 出力


R言語で MapReduce −Hadoop Streaming− - hamadakoichi blogを元に試してみます。



con = file(description="stdin",open="r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
        line <- unlist(strsplit(line, "\t"))
        for(word in line){
                cat(sprintf("%s\t%s\n", word, 1), sep = "")



env <- new.env(hash = TRUE)
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
        line <- unlist(strsplit(line, "\t"))
        key <- line[1]
        value <- as.integer(line[2])
        if (exists(key, envir = env, inherits = FALSE)) {
                count <- get(key, envir = env)
                assign(key, count + value, envir = env)
        else {
                assign(key, value, envir = env)

for (key in ls(env, all = TRUE)) {
        cat(key, "\t", get(key, envir = env), "\n", sep = " ")


a       b       c
d       e       f       g
g       d       c       a
a       a       d       d


$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -files mapper.r, reducer.r -input R/test.data -output R/output -mapper mapper.r -reducer reducer.r

11/04/24 00:32:22 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
11/04/24 00:32:22 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
11/04/24 00:32:22 INFO mapred.FileInputFormat: Total input paths to process : 1
11/04/24 00:32:22 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local]
11/04/24 00:32:22 INFO streaming.StreamJob: Running job: job_local_0001
11/04/24 00:32:22 INFO streaming.StreamJob: Job running in-process (local Hadoop)
11/04/24 00:32:22 INFO mapred.FileInputFormat: Total input paths to process : 1
11/04/24 00:32:23 INFO mapred.MapTask: numReduceTasks: 1
11/04/24 00:32:23 INFO mapred.MapTask: io.sort.mb = 100
11/04/24 00:32:23 INFO mapred.MapTask: data buffer = 79691776/99614720
11/04/24 00:32:23 INFO mapred.MapTask: record buffer = 262144/327680
11/04/24 00:32:23 INFO streaming.PipeMapRed: PipeMapRed exec [/usr/local/hadoop/./R/mapper.r]
11/04/24 00:32:23 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
11/04/24 00:32:23 INFO streaming.StreamJob:  map 0%  reduce 0%
11/04/24 00:32:23 INFO streaming.PipeMapRed: Records R/W=4/1
11/04/24 00:32:24 INFO streaming.PipeMapRed: MROutputThread done
11/04/24 00:32:24 INFO streaming.PipeMapRed: MRErrorThread done
11/04/24 00:32:24 INFO streaming.PipeMapRed: mapRedFinished
11/04/24 00:32:24 INFO mapred.MapTask: Starting flush of map output
11/04/24 00:32:24 INFO mapred.MapTask: Finished spill 0
11/04/24 00:32:24 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
11/04/24 00:32:24 INFO mapred.LocalJobRunner: Records R/W=4/1
11/04/24 00:32:24 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
11/04/24 00:32:24 INFO mapred.LocalJobRunner:
11/04/24 00:32:24 INFO mapred.Merger: Merging 1 sorted segments
11/04/24 00:32:24 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 92 bytes
11/04/24 00:32:24 INFO mapred.LocalJobRunner:
11/04/24 00:32:24 INFO streaming.PipeMapRed: PipeMapRed exec [/usr/local/hadoop/./R/reducer.r]
11/04/24 00:32:24 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
11/04/24 00:32:24 INFO streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
11/04/24 00:32:24 INFO streaming.PipeMapRed: MRErrorThread done
11/04/24 00:32:24 INFO streaming.PipeMapRed: Records R/W=15/1
11/04/24 00:32:24 INFO streaming.PipeMapRed: MROutputThread done
11/04/24 00:32:24 INFO streaming.PipeMapRed: mapRedFinished
11/04/24 00:32:24 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
11/04/24 00:32:24 INFO mapred.LocalJobRunner:
11/04/24 00:32:24 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
11/04/24 00:32:24 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to file:/usr/local/hadoop/R/output
11/04/24 00:32:24 INFO mapred.LocalJobRunner: Records R/W=15/1 > reduce
11/04/24 00:32:24 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
11/04/24 00:32:24 INFO streaming.StreamJob:  map 100%  reduce 100%
11/04/24 00:32:24 INFO streaming.StreamJob: Job complete: job_local_0001
11/04/24 00:32:24 INFO streaming.StreamJob: Output: R/output


a        4
b        1
c        2
d        4
e        1
f        1
g        2 







を見ると絶望的な気持ちになりますが、Hadoop streamingで書くと非常にすっきりしますね。


con = file(description="stdin",open="r")
Functions to create, open and close connections.
descriptioncharacter string. A description of the connection: see ‘Details’. Use "stdin" to refer to the C-level ‘standard input’ of the process (which need not be connected to anything in a console or embedded version of R). See also stdin() for the subtly different R-level concept ofstdin.標準入力をオープン

while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0)
Read some or all text lines from a connection.
cona connection object or a character string.ninteger. The (maximal) number of lines to read. Negative values indicate that one should read up to the end of input on the connection.warnlogical. Warn if a text file is missing a final EOL. 標準入力から一行ずつ読んで、入力がなくなるまでループ

env <- new.env(hash = TRUE)
Get, set, test for and create environments.
hasha logical, if TRUE the environment will use a hash table. new.env returns a new (empty) environment with (by default) enclosure the parent frame.ハッシュテーブルを使って環境を作成?

if (exists(key, envir = env, inherits = FALSE))
Look for an R object of the given name.
xa variable name (given as a character string). enviran alternative way to specify an environment to look in, but it is usually simpler to just use the where argument.
inheritshould the enclosing frames of the environment be searched? keyがenvに存在しているかを調べる

assign(key, count + value, envir = env)
Assign a value to a name in an environment.
xa variable name, given as a character string. No coercion is done, and the first element of a character vector of length greater than one will be used, with a warning.valuea value to be assigned to x. envirthe environment to use. See the details section.

for (key in ls(env, all = TRUE))
ls and objects return a vector of character strings giving the names of the objects in the specified environment. When invoked with no argument at the top level prompt, ls shows what data sets and functions a user has defined. When invoked with no argument inside a function, ls returns the names of the functions local variables. This is useful in conjunction with browser.
namewhich environment to use in listing the available objects. Defaults to the current environment. Although called name for back compatibility, in fact this argument can specify the environment in any form; see the details section. enviran alternative argument to name for specifying the environment. Mostly there for back compatibility.all.namesa logical value. If TRUE, all object names are returned. If FALSE, names which begin with a . are omitted.

cat(key, "\t", get(key, envir = env), "\n", sep = " ")

Outputs the objects, concatenating the representations. cat performs much less conversion than print.
Search for an R object with a given name and return it....R objects (see ‘Details’ for the types of objects allowed).
Search for an R object with a given name and return it.

key \t (envのkeyに割り当てられられている値) \nで出力