Showing posts with label R. Show all posts
Showing posts with label R. Show all posts

Tuesday 21 October 2014

Hadoop/R Integration: Streaming

If you've spent any time with MapReduce frameworks in general, by now you probably know the word-count example is the MapReduce equivalent of "Hello World!".  In earlier posts, I tried to be slightly different, but with an equally-simple problem - counting up the total dollar value, by state, of new issues of mortgage-backed securities pooled by Fannie Mae.

I have used "straight" Java and Pig so far, and now I'll turn my attention to R.  After our example, we'll discuss what makes R unique in this situation, and why a word-count type of example doesn't really do R justice.  In advance, I'll mention my main references used here are Joseph Adler's R in a Nutshell (see chapter 26) and Tom White's Hadoop: The Definitive Guide (Chapter 2).

There are a number of ways to use R with Hadoop, including:
  • Hadoop streaming, the subject of this post
  • RHadoop, an R/Hadoop integration (see the RHadoop Wiki), the subject of a future post
  •  RHIPE (pronounced hree-pay), another R/Hadoop integration.
Because of the breadth of topics I'm trying to cover in this blog, I'm going to restrict myself to streaming and RHadoop for this topic.

Overview

In Hadoop streaming, your mapper, reducer, and optional combiner processes are written to read from standard input and to write to standard output.  Once the process scripts and data are ready, you simply invoke Hadoop using its streaming binaries along with some command-line properties.

As in previous posts, I'll be taking data from Fannie Mae's New Issue Pool Statistics (NIPS) files.  For more info, see a previous post.  I'll be using the same data as in that post, so we can expect an exact match on the results.

The Mapper

NIPS files are a little interesting, in that they contain a number of differently-formatted records (check here for the formats).  To perform our analysis, we will be looking at record type 9, "GEOGRAPHIC DISTRIBUTION".  We will be interested in columns 3 (state name) and 6 (aggregate unpaid balance).  Since numerous record formats are mixed within a single file, we'll first split the file on the pipe delimiters and discard the records that are not of type 9.  All we need to do is output the state name and the aggregate unpaid balance, one instance per type-9 line.

To develop my R scripts, I'm using RStudio, an IDE I learned of through Roger Peng's Computing for Data Analysiscourse on coursera.  After an interactive script-building session in RStudio, I produced the following test script:

#! /usr/bin/env Rscript

conn <- file("/home/hduser/fannie-mae-nips/nips_12262012.txt", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  split.line <- strsplit(next.line, "\\|")
  if (as.numeric(split.line[[1]][2]) == 9) {
    write(paste(split.line[[1]][3],
                gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
  }
}
close(conn)

which I then invoked from a shell and got the following output, truncated:

CALIFORNIA    167300.00
FLORIDA    395950.00
GEORGIA    69500.00
ILLINOIS    235200.00
MICHIGAN    781950.00
NEW JERSEY    284550.00
OHIO    334175.00


Since this looks clean, I modified the mapper slightly to produce the final version:

#! /usr/bin/env Rscript

conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  split.line <- strsplit(next.line, "\\|")
  if (as.numeric(split.line[[1]][2]) == 9) {
    write(paste(split.line[[1]][3],
                gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
  }
}
close(conn)


Note the interesting subscripting to get the results of strsplit:  strsplit returns a list, so field 2 of the file record is actually element 2 of the first element of the list, which is a vector of parsed fields.  If you need some clarification of this result, see the "Subscripting" chapter from Phil Spector's Data Manipulation with R.  Also note the compact usage ofgsub to remove the dollar signs and commas from the aggregate unpaid balances.

The Reducer

Our reducer will also be reading from stdin, with the following guaranteed by the Hadoop runtime:
  • If a key is encountered by the reducer, then the reducer knows that all records with that key are being sent to this reducer, so it can produce an output with the knowledge that it has been given all the records for that key;
  • Incoming records are sorted by key, so the reducer knows that, when a key changes, then all records for the previous key have already been encountered in the stream.
In our reducer, we will have a couple of variables: one to keep track of which key is being processed, and one to hold the running total of aggregate unpaid balances for mortgages from a given state.  Once a key changes, we will output the current balance running total (with its key) and reset the running balance:

#! /usr/bin/env Rscript

current.key <- NA
current.upb <- 0.0

conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  split.line <- strsplit(next.line, "\t")
  key <- split.line[[1]][1]
  upb <- as.numeric(split.line[[1]][2])
  if (is.na(current.key)) {
    current.key <- key
    current.upb <- upb
  }
  else {
    if (current.key == key) {
      current.upb <- current.upb + upb
    }
    else {
      write(paste(current.key, current.upb, sep="\t"), stdout())
      current.key <- key
      current.upb <- upb
    }
  }
}
write(paste(current.key, current.upb, sep="\t"), stdout())
close(conn)

Now, I'd like to test this reducer on a single file, but I have a small issue -- my mapper does not sort the output (it doesn't need to, of course), but my reducer expects the data to be sorted by key.  I could just wait and see how the final numbers come out, but since streaming just involves piping stdout to stdin, I'm a little curious about how fast this task could be run outside of Hadoop (I'm not really comparing, for a simple single-node cluster; I'm just curious).  And I'm still learning R, so I next write a script to sort the rows by record key:

#! /usr/bin/env Rscript

conn <- file("stdin", open="r")
all.lines <- readLines(conn)
write(sort(all.lines), stdout())
close(conn)


At times like this, I remember why I like R so much!  Next, I process a single file with my "test" version of the mapper:

./map.test.R | ./map.output.sorter.R | ./reduce.R

and get output like the following (abbreviated) for a single NIPS file:

ALABAMA 72699735.21
ALASKA  6883209.62
ARIZONA 287482321.1
ARKANSAS        21579003.98
CALIFORNIA      1811342276.77

...
VIRGIN ISLANDS  1021750
WASHINGTON      239648997.97
WEST VIRGINIA   9925894.94
WISCONSIN       72752945.87
WYOMING 6232557.56


Streaming in Hadoop with R

Now that we have a mapper and a reducer, we can process the entire data set in Hadoop.  I will process the same set of data as I did in my previous Hadoop-Java-Pig comparison post, meaning NIPS data from 23 August to 26 December 2012.  As in that post, I am running Hadoop in pseudo-distributed mode, with the data coming from HDFS.  The difference here, of course, is that I am specifying streaming, and providing my mapper and reducer R scripts.  I launch from the Hadoop home directory:

bin/hadoop jar $HADOOP_PREFIX/contrib/streaming/hadoop-streaming-1.1.0.jar -input /user/hduser/fannie-mae-nips -output /user/hduser/fannie-mae-nips-r-output -mapper /home/hduser/RScripts/map.R -reducer /home/hduser/RScripts/reduce.R

So, what did I get for my efforts?  Copying my results file from HDFS:

bin/hadoop dfs -copyToLocal /user/hduser/fannie-mae-nips-r-output/part-00000 rResults.txt

yields the following output (abbreviated here):

ALABAMA 3242681838.89999
ALASKA  841797447.200001
ARIZONA 9340767235.06001
ARKANSAS        1452136751.9
CALIFORNIA      89114642822.0799
...

VERMONT 553060435.67
VIRGIN ISLANDS  33604327.46
VIRGINIA        12706719836.48
WASHINGTON      13194248475.54
WEST VIRGINIA   486889587.57
WISCONSIN       8140391871.79
WYOMING 720905726.84


I still have the outputs from my previous post on this same data set, using Java and Pig; perusing this output shows the following output (note I did not diff the files because the numbers were output in a different format):

ALABAMA 3.242681838899994E9
ALASKA  8.417974472000003E8
ARIZONA 9.340767235060005E9
ARKANSAS        1.452136751900001E9
CALIFORNIA      8.91146428220799E10

....
VERMONT 5.530604356700001E8
VIRGIN ISLANDS  3.360432746000001E7
VIRGINIA        1.2706719836479996E10
WASHINGTON      1.319424847554002E10
WEST VIRGINIA   4.868895875700002E8
WISCONSIN       8.140391871790002E9
WYOMING 7.209057268400007E8


So, I successfully duplicated the Java and Pig examples using R and Hadoop streaming.

Final Comments about Hadoop and R

If you are at all familiar with R, you understand that R isn't a language you pick up just to split lines of output and sum numbers; the language and its libraries contain a wealth of functionality.  The point of this post was primarily to work through the mechanical details of using R with Hadoop streaming.  Where R would really shine is if we had some "heavy lifting" to do with R that was easily decomposable into map-style and reduce-style tasks.  For example, if you were fitting a linear regression against a huge data set, using a large number of variables, or if you were performing a Shapiro-Wilk test against a large data set, the ability to split up the job into parallel tasks, combining them at the end with a reducer, would be a great example of Hadoop/R synergy.  For more information on parallel computation in R, see chapter 26 of Joseph Adler's R in a Nutshell, especially his "Where to Learn More" section at the end of the chapter.

R examples

R: Deriving a new data frame column based on containing string

I’ve been playing around with R data frames a bit more and one thing I wanted to do was derive a new column based on the text contained in the existing column.
I started with something like this:
1.> x = data.frame(name = c("Java Hackathon", "Intro to Graphs", "Hands on Cypher"))
2.> x
3.name
4.1  Java Hackathon
5.2 Intro to Graphs
6.3 Hands on Cypher
And I wanted to derive a new column based on whether or not the session was a practical one. The grepl function seemed to be the best tool for the job:
1.> grepl("Hackathon|Hands on|Hands On", x$name)
2.[1]  TRUE FALSE  TRUE
We can then add a column to our data frame with that output:
1.x$practical = grepl("Hackathon|Hands on|Hands On", x$name)
And we end up with the following:
1.> x
2.name practical
3.1  Java Hackathon      TRUE
4.2 Intro to Graphs     FALSE
5.3 Hands on Cypher      TRUE
Not too tricky but it took me a bit too long to figure it out so I thought I’d save future Mark some time!

How to Import Some Parts of a Large Database

In the introduction of Computational Actuarial Science with R, there was a short paragraph on how could we import only some parts of a large database, by selecting specific variables. The trick was to use the following
01.> read.table.select.columns=function(datatablename,
02.I,sep=";"){
03.+ datanc=read.table(datatablename,header=TRUE,
04.sep=sep,skip=0,nrows=1)
05.+ mycols=rep("NULL",ncol(datanc))
06.+ names(mycols)=names(datanc)
07.+ mycols[I]=NA
08.+ datat=read.table(datatablename,header=TRUE,
09.sep=sep,colClasses=mycols)
10.+ return(datat)}
For instance, if we use the same dataset as in the introduction, we can import only two variables of interest,
02.> dt1=read.table.select.columns(loc,c("Region",
03."Wmax"),sep=",")
04.> head(dt1,10)
05.Region      Wmax
06.1    Basin 105.56342
07.2    Basin  40.00000
08.3    Basin  35.41822
09.4    Basin  51.06743
10.5  Florida  87.34328
11.6    Basin  96.64138
12.7     Gulf  35.41822
13.8       US  35.41822
14.9       US  87.34328
15.10      US 106.35318
16.> dim(dt1)
17.[1] 2100    2
In other cases, it might be interesting to select some raws, or to avoid some of them (e.g. because of some typos in the original dataset). If we want to drop some specific raws, we can use
01.> read.table.drop.rows=function(datatablename,
02.I,sep=";"){
03.+ I=sort(I)
04.+ if(min(I)>1) minI=1
05.+ if(min(I)==1) minI=NULL
06.+ index1=c(minI,I[c(which(diff(I)>1),length(I))]+1)
07.+ index2=c(I[c(minI,which(diff(I)>1)+1)],
08.max(index1)-1)
09.+ datat=read.table(datatablename,header=TRUE,
10.sep,skip=0,nrows=1)
11.+ datat=datat[-1,]
12.+ for(i in 1:length(index1)){
13.+ datat0=read.table(datatablename,header=FALSE,
14.sep=sep,skip=index1[i],nrows=index2[i]-index1[i])
15.+ names(datat0)=names(datat)
16.+ datat=rbind(datat,datat0)
17.+ }
18.+ return(datat)}
On the same dataset, assume that we have some troubles reading some lines, or we know that values are not valid,
01.> dt2=read.table.drop.rows(loc,c(3,6:8),sep=",")
02.> head(dt2[,1:5],10)
03.Yr  Region      Wmax      sst sun
04.1  1899   Basin 105.56342 0.046596 8.4
05.2  1899   Basin  40.00000 0.046596 8.4
06.3  1899   Basin  51.06743 0.046596 8.4
07.4  1899 Florida  87.34328 0.046596 8.4
08.5  1899      US  87.34328 0.046596 8.4
09.6  1899      US 106.35318 0.046596 8.4
10.7  1899      US  51.06743 0.046596 8.4
11.8  1899      US  90.19791 0.046596 8.4
12.9  1899   Basin  56.48593 0.046596 8.4
13.10 1899   Basin 131.26902 0.046596 8.4
14.> dim(dt2)
15.[1] 2096   11
Now, we can try to do both at the same time (more or less) : for some specific variables, we want to import a subpart of the database, but we also want to avoid some specific lines,
01.> read.table.select.columns.rows=function(
02.datatablename,Ic,Ir,sep=";"){
03.+ datanc=read.table(datatablename,header=TRUE,
04.sep=sep,skip=0,nrows=1)
05.+ mycols=rep("NULL",ncol(datanc))
06.+ names(mycols)=names(datanc)
07.+ mycols[Ic]=NA
08.+ I=sort(Ir)
09.+ if(min(I)>1) minI=1
10.+ if(min(I)==1) minI=NULL
11.+ index1=c(minI,I[c(which(diff(I)>1),length(I))]+1)
12.+ index2=c(I[c(minI,which(diff(I)>1)+1)],
13.max(index1)-1)
14.+ datat=read.table(datatablename,header=TRUE,
15.sep=sep,skip=0,nrows=1,colClasses=mycols)
16.+ datat=datat[-1,]
17.+ for(i in 1:length(index1)){
18.+ datat0=read.table(datatablename,header=FALSE,
19.sep=sep,skip=index1[i],nrows=index2[i]-index1[i],colClasses=mycols)
20.+ names(datat0)=names(datat)
21.+ datat=rbind(datat,datat0)
22.+ }
23.+ return(datat)}
24.> dt3=read.table.select.columns.rows(loc,
25.c("Wmax","Region"),c(3,6:8),sep=",")
26.> head(dt3)
27.Region      Wmax
28.1   Basin 105.56342
29.2   Basin  40.00000
30.3   Basin  51.06743
31.4 Florida  87.34328
32.5      US  87.34328
33.6      US 106.35318
34.> dim(dt3)
35.[1] 2096    2
One can observe that it is the same, here, as
01.> df=read.table(loc,header=TRUE,sep=",")
02.> sdf=df[-c(3,6:8),c("Wmax","Region")]
03.> head(sdf)
04.Wmax  Region
05.1  105.56342   Basin
06.2   40.00000   Basin
07.4   51.06743   Basin
08.5   87.34328 Florida
09.9   87.34328      US
10.10 106.35318      US
11.> dim(sdf)
12.[1] 2096    2
But if the dataset was much larger (with thousands of variables) with also some problems on specific lines, we now have a nice code to import our database.

R: Filtering data frames by column type ('x' must be numeric)

I’ve been working through the exercises from An Introduction to Statistical Learning and one of them required you to create a pair wise correlation matrix of variables in a data frame.
The exercise uses the ‘Carseats’ data set which can be imported like so:
01.> install.packages("ISLR")
02.> library(ISLR)
03.> head(Carseats)
04.Sales CompPrice Income Advertising Population Price ShelveLoc Age Education Urban  US
05.1  9.50       138     73          11        276   120       Bad  42        17   Yes Yes
06.2 11.22       111     48          16        260    83      Good  65        10   Yes Yes
07.3 10.06       113     35          10        269    80    Medium  59        12   Yes Yes
08.4  7.40       117    100           4        466    97    Medium  55        14   Yes Yes
09.5  4.15       141     64           3        340   128       Bad  38        13   Yes  No
10.6 10.81       124    113          13        501    72       Bad  78        16    No Yes
filter the categorical variables from a data frame and
If we try to run the ‘cor‘ function on the data frame we’ll get the following error:
1.> cor(Carseats)
2.Error in cor(Carseats) : 'x' must be numeric
As the error message suggests, we can’t pass non numeric variables to this function so we need to remove the categorical variables from our data frame.
But first we need to work out which columns those are:
1.> sapply(Carseats, class)
2.Sales   CompPrice      Income Advertising  Population       Price   ShelveLoc         Age   Education
3."numeric"   "numeric"   "numeric"   "numeric"   "numeric"   "numeric"    "factor"   "numeric"   "numeric"
4.Urban          US
5."factor"    "factor"
We can see a few columns of type ‘factor’ and luckily for us there’s a function which will help us identify those more easily:
1.> sapply(Carseats, is.factor)
2.Sales   CompPrice      Income Advertising  Population       Price   ShelveLoc         Age   Education
3.FALSE       FALSE       FALSE       FALSE       FALSE       FALSE        TRUE       FALSE       FALSE
4.Urban          US
5.TRUE        TRUE
Now we can remove those columns from our data frame and create the correlation matrix:
01.> cor(Carseats[sapply(Carseats, function(x) !is.factor(x))])
02.Sales   CompPrice       Income  Advertising   Population       Price          Age    Education
03.Sales        1.00000000  0.06407873  0.151950979  0.269506781  0.050470984 -0.44495073 -0.231815440 -0.051955242
04.CompPrice    0.06407873  1.00000000 -0.080653423 -0.024198788 -0.094706516  0.58484777 -0.100238817  0.025197050
05.Income       0.15195098 -0.08065342  1.000000000  0.058994706 -0.007876994 -0.05669820 -0.004670094 -0.056855422
06.Advertising  0.26950678 -0.02419879  0.058994706  1.000000000  0.265652145  0.04453687 -0.004557497 -0.033594307
07.Population   0.05047098 -0.09470652 -0.007876994  0.265652145  1.000000000 -0.01214362 -0.042663355 -0.106378231
08.Price       -0.44495073  0.58484777 -0.056698202  0.044536874 -0.012143620  1.00000000 -0.102176839  0.011746599
09.Age         -0.23181544 -0.10023882 -0.004670094 -0.004557497 -0.042663355 -0.10217684  1.000000000  0.006488032
10.Education   -0.05195524  0.02519705 -0.056855422 -0.033594307 -0.106378231  0.01174660  0.006488032  1.000000000
Be Sociable, Share!

R: ggplot - Plotting multiple variables on a line chart

In my continued playing around with meetup data I wanted to plot the number of members who join the Neo4j group over time.
I started off with the variable ‘byWeek’ which shows how many members joined the group each week:
> head(byWeek)
Source: local data frame [6 x 2]
 
        week n
1 2011-06-02 8
2 2011-06-09 4
3 2011-06-30 2
4 2011-07-14 1
5 2011-07-21 1
6 2011-08-18 1
I wanted to plot the actual count alongside a rolling average for which I created the following data frame:
library(zoo)
joinsByWeek = data.frame(actual = byWeek$n, 
                         week = byWeek$week,
                         rolling = rollmean(byWeek$n, 4, fill = NA, align=c("right")))
> head(joinsByWeek, 10)
   actual       week rolling
1       8 2011-06-02      NA
2       4 2011-06-09      NA
3       2 2011-06-30      NA
4       1 2011-07-14    3.75
5       1 2011-07-21    2.00
6       1 2011-08-18    1.25
7       1 2011-10-13    1.00
8       2 2011-11-24    1.25
9       1 2012-01-05    1.25
10      3 2012-01-12    1.75
The next step was to work out how to plot both ‘rolling’ and ‘actual’ on the same line chart. The easiest way is to make two calls to ‘geom_line’, like so:
ggplot(joinsByWeek, aes(x = week)) + 
  geom_line(aes(y = rolling), colour="blue") + 
  geom_line(aes(y = actual), colour = "grey") + 
  ylab(label="Number of new members") + 
  xlab("Week")
2014 09 16 21 57 14
Alternatively we can make use of the ‘melt’ function from the reshape library…
library(reshape)
meltedJoinsByWeek = melt(joinsByWeek, id = 'week')
> head(meltedJoinsByWeek, 20)
   week variable value
1     1   actual     8
2     2   actual     4
3     3   actual     2
4     4   actual     1
5     5   actual     1
6     6   actual     1
7     7   actual     1
8     8   actual     2
9     9   actual     1
10   10   actual     3
11   11   actual     1
12   12   actual     2
13   13   actual     4
14   14   actual     2
15   15   actual     3
16   16   actual     5
17   17   actual     1
18   18   actual     2
19   19   actual     1
20   20   actual     2
…which then means we can plot the chart with a single call to geom_line:
ggplot(meltedJoinsByWeek, aes(x = week, y = value, colour = variable)) + 
  geom_line() + 
  ylab(label="Number of new members") + 
  xlab("Week Number") + 
  scale_colour_manual(values=c("grey", "blue"))
2014 09 16 22 17 40
Related Posts Plugin for WordPress, Blogger...