Tuesday 21 October 2014

Hadoop/R Integration: Streaming

If you've spent any time with MapReduce frameworks in general, by now you probably know the word-count example is the MapReduce equivalent of "Hello World!".  In earlier posts, I tried to be slightly different, but with an equally-simple problem - counting up the total dollar value, by state, of new issues of mortgage-backed securities pooled by Fannie Mae.

I have used "straight" Java and Pig so far, and now I'll turn my attention to R.  After our example, we'll discuss what makes R unique in this situation, and why a word-count type of example doesn't really do R justice.  In advance, I'll mention my main references used here are Joseph Adler's R in a Nutshell (see chapter 26) and Tom White's Hadoop: The Definitive Guide (Chapter 2).

There are a number of ways to use R with Hadoop, including:
  • Hadoop streaming, the subject of this post
  • RHadoop, an R/Hadoop integration (see the RHadoop Wiki), the subject of a future post
  •  RHIPE (pronounced hree-pay), another R/Hadoop integration.
Because of the breadth of topics I'm trying to cover in this blog, I'm going to restrict myself to streaming and RHadoop for this topic.

Overview

In Hadoop streaming, your mapper, reducer, and optional combiner processes are written to read from standard input and to write to standard output.  Once the process scripts and data are ready, you simply invoke Hadoop using its streaming binaries along with some command-line properties.

As in previous posts, I'll be taking data from Fannie Mae's New Issue Pool Statistics (NIPS) files.  For more info, see a previous post.  I'll be using the same data as in that post, so we can expect an exact match on the results.

The Mapper

NIPS files are a little interesting, in that they contain a number of differently-formatted records (check here for the formats).  To perform our analysis, we will be looking at record type 9, "GEOGRAPHIC DISTRIBUTION".  We will be interested in columns 3 (state name) and 6 (aggregate unpaid balance).  Since numerous record formats are mixed within a single file, we'll first split the file on the pipe delimiters and discard the records that are not of type 9.  All we need to do is output the state name and the aggregate unpaid balance, one instance per type-9 line.

To develop my R scripts, I'm using RStudio, an IDE I learned of through Roger Peng's Computing for Data Analysiscourse on coursera.  After an interactive script-building session in RStudio, I produced the following test script:

#! /usr/bin/env Rscript

conn <- file("/home/hduser/fannie-mae-nips/nips_12262012.txt", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  split.line <- strsplit(next.line, "\\|")
  if (as.numeric(split.line[[1]][2]) == 9) {
    write(paste(split.line[[1]][3],
                gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
  }
}
close(conn)

which I then invoked from a shell and got the following output, truncated:

CALIFORNIA    167300.00
FLORIDA    395950.00
GEORGIA    69500.00
ILLINOIS    235200.00
MICHIGAN    781950.00
NEW JERSEY    284550.00
OHIO    334175.00


Since this looks clean, I modified the mapper slightly to produce the final version:

#! /usr/bin/env Rscript

conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  split.line <- strsplit(next.line, "\\|")
  if (as.numeric(split.line[[1]][2]) == 9) {
    write(paste(split.line[[1]][3],
                gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
  }
}
close(conn)


Note the interesting subscripting to get the results of strsplit:  strsplit returns a list, so field 2 of the file record is actually element 2 of the first element of the list, which is a vector of parsed fields.  If you need some clarification of this result, see the "Subscripting" chapter from Phil Spector's Data Manipulation with R.  Also note the compact usage ofgsub to remove the dollar signs and commas from the aggregate unpaid balances.

The Reducer

Our reducer will also be reading from stdin, with the following guaranteed by the Hadoop runtime:
  • If a key is encountered by the reducer, then the reducer knows that all records with that key are being sent to this reducer, so it can produce an output with the knowledge that it has been given all the records for that key;
  • Incoming records are sorted by key, so the reducer knows that, when a key changes, then all records for the previous key have already been encountered in the stream.
In our reducer, we will have a couple of variables: one to keep track of which key is being processed, and one to hold the running total of aggregate unpaid balances for mortgages from a given state.  Once a key changes, we will output the current balance running total (with its key) and reset the running balance:

#! /usr/bin/env Rscript

current.key <- NA
current.upb <- 0.0

conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  split.line <- strsplit(next.line, "\t")
  key <- split.line[[1]][1]
  upb <- as.numeric(split.line[[1]][2])
  if (is.na(current.key)) {
    current.key <- key
    current.upb <- upb
  }
  else {
    if (current.key == key) {
      current.upb <- current.upb + upb
    }
    else {
      write(paste(current.key, current.upb, sep="\t"), stdout())
      current.key <- key
      current.upb <- upb
    }
  }
}
write(paste(current.key, current.upb, sep="\t"), stdout())
close(conn)

Now, I'd like to test this reducer on a single file, but I have a small issue -- my mapper does not sort the output (it doesn't need to, of course), but my reducer expects the data to be sorted by key.  I could just wait and see how the final numbers come out, but since streaming just involves piping stdout to stdin, I'm a little curious about how fast this task could be run outside of Hadoop (I'm not really comparing, for a simple single-node cluster; I'm just curious).  And I'm still learning R, so I next write a script to sort the rows by record key:

#! /usr/bin/env Rscript

conn <- file("stdin", open="r")
all.lines <- readLines(conn)
write(sort(all.lines), stdout())
close(conn)


At times like this, I remember why I like R so much!  Next, I process a single file with my "test" version of the mapper:

./map.test.R | ./map.output.sorter.R | ./reduce.R

and get output like the following (abbreviated) for a single NIPS file:

ALABAMA 72699735.21
ALASKA  6883209.62
ARIZONA 287482321.1
ARKANSAS        21579003.98
CALIFORNIA      1811342276.77

...
VIRGIN ISLANDS  1021750
WASHINGTON      239648997.97
WEST VIRGINIA   9925894.94
WISCONSIN       72752945.87
WYOMING 6232557.56


Streaming in Hadoop with R

Now that we have a mapper and a reducer, we can process the entire data set in Hadoop.  I will process the same set of data as I did in my previous Hadoop-Java-Pig comparison post, meaning NIPS data from 23 August to 26 December 2012.  As in that post, I am running Hadoop in pseudo-distributed mode, with the data coming from HDFS.  The difference here, of course, is that I am specifying streaming, and providing my mapper and reducer R scripts.  I launch from the Hadoop home directory:

bin/hadoop jar $HADOOP_PREFIX/contrib/streaming/hadoop-streaming-1.1.0.jar -input /user/hduser/fannie-mae-nips -output /user/hduser/fannie-mae-nips-r-output -mapper /home/hduser/RScripts/map.R -reducer /home/hduser/RScripts/reduce.R

So, what did I get for my efforts?  Copying my results file from HDFS:

bin/hadoop dfs -copyToLocal /user/hduser/fannie-mae-nips-r-output/part-00000 rResults.txt

yields the following output (abbreviated here):

ALABAMA 3242681838.89999
ALASKA  841797447.200001
ARIZONA 9340767235.06001
ARKANSAS        1452136751.9
CALIFORNIA      89114642822.0799
...

VERMONT 553060435.67
VIRGIN ISLANDS  33604327.46
VIRGINIA        12706719836.48
WASHINGTON      13194248475.54
WEST VIRGINIA   486889587.57
WISCONSIN       8140391871.79
WYOMING 720905726.84


I still have the outputs from my previous post on this same data set, using Java and Pig; perusing this output shows the following output (note I did not diff the files because the numbers were output in a different format):

ALABAMA 3.242681838899994E9
ALASKA  8.417974472000003E8
ARIZONA 9.340767235060005E9
ARKANSAS        1.452136751900001E9
CALIFORNIA      8.91146428220799E10

....
VERMONT 5.530604356700001E8
VIRGIN ISLANDS  3.360432746000001E7
VIRGINIA        1.2706719836479996E10
WASHINGTON      1.319424847554002E10
WEST VIRGINIA   4.868895875700002E8
WISCONSIN       8.140391871790002E9
WYOMING 7.209057268400007E8


So, I successfully duplicated the Java and Pig examples using R and Hadoop streaming.

Final Comments about Hadoop and R

If you are at all familiar with R, you understand that R isn't a language you pick up just to split lines of output and sum numbers; the language and its libraries contain a wealth of functionality.  The point of this post was primarily to work through the mechanical details of using R with Hadoop streaming.  Where R would really shine is if we had some "heavy lifting" to do with R that was easily decomposable into map-style and reduce-style tasks.  For example, if you were fitting a linear regression against a huge data set, using a large number of variables, or if you were performing a Shapiro-Wilk test against a large data set, the ability to split up the job into parallel tasks, combining them at the end with a reducer, would be a great example of Hadoop/R synergy.  For more information on parallel computation in R, see chapter 26 of Joseph Adler's R in a Nutshell, especially his "Where to Learn More" section at the end of the chapter.

No comments:

Post a Comment

Related Posts Plugin for WordPress, Blogger...