Tuesday, 21 October 2014

Some tips on R data frames

I just finished Roger Peng's Computing for Data Analysis course on coursera (I highly recommend Roger, the class and coursera!).  R is the language used in this course, and it was my first exposure to actually having to make something work in R, on a deadline.  If you're new to R, one area where you quickly realize you're not in Kansas any more is in R's data structures, and so far they're my favorite part of the language.  But I caught myself making a few mistakes early on, and let's just say things went better for me when I changed how I look at the whole data-manipulation process.

What is an R data frame?  My definition won't be especially formal, but it's essentially a collection of vectors, each of which describes a variable.  Each vectors is the same length, and while all the elements of the vector must be of the same type, the vectors themselves can be of different types.  The key is that these vectors are what you would normally think of as the columns, not the rows, of a table.

If you are accustomed to working with tables (both in a programming language and in a relational database), you might like to think of data as a collection of rows, rows which consist of several different chunks of data which can be of any data type.  For an R data frame, it is more natural to think of a collection of columns, where you still have the ability to take a "horizontal slice" of the collection to get the familiar row, e.g an observation of a measurement.

If you don't have R installed, now is a good time to do so.  R can be found at The R Project for Statistical Computing.  I have also downloaded a free, open-source high-quality IDE called RStudio, and I highly recommend you do the same.

In RStudio, you can bring up help on R data frames by entering

     ?data.frame

in the RStudio console window.  If you have never read the associated help page (which will pop up in the "Help" window), I would suggest you do that before continuing.  The short description you get there is:

This function creates data frames, tightly coupled collections of variables which share many of the properties of matrices and of lists, used as the fundamental data structure by most of R's modeling software.

Much better than I could have said!  Note that it refers to "This function...".  data.frame is actually an R function, and we'll see how to create a data frame right here.  First, we'll create a few vectors (type ?c to see a discussion of the function which combines elements into a vector; the details aren't really important right now), then create the data frame.  Enter the following into the RStudio console window (note the ">" symbols are the console prompt):


> state.code <- c("CA", "KY", "MD", "VA", "CO")
> years.lived <- c(3, 11, 4, 4, 9)
> returned.recently <- c(TRUE, FALSE, FALSE, FALSE, TRUE)
> residence.history <- data.frame(state.code, years.lived, returned.recently)


Above, I am using Google's R Style Guide in my naming conventions.  The above lines create vectors of character, numeric and logical data, respectively, then combine them to create a data frame.  After you create the data frameresidence.history, look in RStudio's "Workspace" window -- you'll see the data frame described as "5 obs. of 3 variables".  R does recognize that you are trying to tabulate 5 observations, which describes someone's residence history and whether or not he's recently returned to those states.

The fact that these columns are arranged into a collection of observations means that they must be the same length.  So, for example, if you tried to add an additional column, say of the primary city of residence in each state (you can add columns with cbind), that column would have to have 5 rows also.  If you forgot to include one observation for that variable, you would see something like the following:


> cbind(residence.history, c("Bakersfield", "Louisville", "Baltimore", "Arlington"))
Error in data.frame(..., check.names = FALSE) : 
  arguments imply differing number of rows: 5, 4

Building data frames

The typical example for building a data frame in R -- creating a set of columns and binding them with the data.frame function -- is handy but not very illustrative of your typical experience.  For this reason, as a mostly-Java developer, the first time I needed to create a data frame, I got a little off-track.  Here's my initial attempt:

  • Create an empty data frame
  • Start a loop over a collection of data
  • In the loop, for each value, perform some computations, etc.
  • Use the row-binding function, rbind, to add the row to my data frame
  • Exit the loop with a populated data frame
This phase didn't last very long.  Let's try it, without the loop first, and see why.  First, I'll create an R list (lists can contain values of different types, while vectors must contain values of the same type), then take a look at it with thestr() function:

> my.first.row <- list("CA", "Bakersfield", TRUE)
> str(my.first.row)
List of 3
 $ : chr "CA"
 $ : chr "Bakersfield"
 $ : logi TRUE

Then I'll row-bind it into a new data frame and verify that the data frame looks as expected:
> new.data.frame <- data.frame()

> new.data.frame <- rbind(new.data.frame, my.first.row)
> str(new.data.frame)
'data.frame': 1 obs. of  3 variables:
 $ X.CA.         : Factor w/ 1 level "CA": 1
 $ X.Bakersfield.: Factor w/ 1 level "Bakersfield": 1
 $ TRUE.         : logi TRUE

So far, this looks good.  I have one observation of my 3 variables.  Note that the state code and the city are factors, rather than character strings.  This is the default behavior in R when building a data frame.  If you don't know what a factor is, it's similar to the allowed values specified in an enum.  This will be important in a minute.

Let's try to add that hypothetical second row we might generate inside an iterator like a for-loop:


> my.second.row <- list("KY", "Louisville", FALSE)
> new.data.frame <- rbind(new.data.frame, my.second.row)
> str(new.data.frame)
'data.frame': 2 obs. of  3 variables:
 $ X.CA.         : Factor w/ 1 level "CA": 1 NA
 $ X.Bakersfield.: Factor w/ 1 level "Bakersfield": 1 NA
 $ TRUE.         : logi  TRUE FALSE
> new.data.frame
   X.CA. X.Bakersfield. TRUE.
2     CA    Bakersfield  TRUE
21  <NA>           <NA> FALSE


OK, my second row did not get absorbed as I expected.  From rooting around a little, I learned this is related to the fact that R, identifying the first columns as factors, rejected the values for those columns in the second row because it "fixed" the set of allowed values for the state-code and city factor, and did not allow new values for the factors.  In other words, similarly to what would happen if you tried to inject a new value into a field reserved for an enum.


There's a way around this in R -- you can simply specify that you don't want your character vectors interpreted as factors when you create a data frame.  You do this by specifying "stringsAsFactors = FALSE" when you create the data frame, but note, importantly -- this setting only applies to the character vectors (i.e., columns) cbinded to the data frame.  It does not help you when you rbind rows to the data frame.  As an example of how this doesn't help, let's try again:


> rm(new.data.frame)
> ?rbind
> new.data.frame <- data.frame(stringsAsFactors = FALSE)
> new.data.frame <- rbind(new.data.frame, my.first.row)
> new.data.frame <- rbind(new.data.frame, my.second.row)
> new.data.frame
   X.CA. X.Bakersfield. TRUE.
2     CA    Bakersfield  TRUE
21  <NA>           <NA> FALSE


At this point, I did a quick search on the internet, and after getting over the slightly-annoying, always-first, single-line responses "Why would you want to do that?", I learned something about R I did not know before.  Generally in R, you are expected to operate over all the elements of a vector or list with functions which return a vector or list.  If the final set of vectors are all of the same length, you can then create a data frame in one function call, passing in the vectors.  (Note:  this is a gross simplification of the issue, but it's more "in the right direction" than my earlier attempts)

This approach won't work with the above example, since I'm basically building the data by hand as I go.  But here's another example which illustrates what I'm talking about.  Suppose I have a vector of some state codes and a couple of R functions which, when passed the state code, return the state capital and the date the state entered the union.  Below is a partial implementation, using the states I have been working with so far in this post.  First, the functions:


GetCapital <- function(stateCode) {
  codes <- c("CA", "KY", "MD", "VA", "CO")
  capitals <- c("Sacramento", "Frankfort", "Annapolis", "Richmond", "Denver")
  capitals[which(codes == stateCode)]
}


GetAdmissionDate <- function(stateCode) {
  codes <- c("CA", "KY", "MD", "VA", "CO")
  dates <- c(1850, 1792, 1788, 1788, 1876)
  dates[which(codes == stateCode)]
}

Now, I will start with a vector of the state codes, and before I create the data frame, just illustrate what happens when you sapply GetAdmissionDate to the vector of state codes:

> states.lived = c("CA", "KY", "MD", "VA", "CO")
> admission.dates <- sapply(states.lived, GetAdmissionDate)
> str(admission.dates)
 Named num [1:5] 1850 1792 1788 1788 1876

 - attr(*, "names")= chr [1:5] "CA" "KY" "MD" "VA" ...


In other words, sapply applies the GetAdmissionDate function to each element of the states.lived vector and returns a vector of the same length.  Now you can see why it's so easy and compact to create a data frame containing five observations, each containing the state code, the state capital, and the year the state was admitted to the union:

> my.data.frame <- data.frame(states.lived, sapply(states.lived, GetCapital), sapply(states.lived, GetAdmissionDate))
> my.data.frame
   states.lived sapply.states.lived..GetCapital.

CA           CA                       Sacramento
KY           KY                        Frankfort
MD           MD                        Annapolis
VA           VA                         Richmond
CO           CO                           Denver
   sapply.states.lived..GetAdmissionDate.
CA                                   1850
KY                                   1792
MD                                   1788
VA                                   1788
CO                                   1876

  Note that the row names default to the values in the first column, and the column names, much like the results of a SQL operation, default to the operation performed to get the column values.  If I tweak things a little to ensure the row names are numbers, that character vectors are slurped as they are, rather than converted to factors, and with column names that I specify, I get the following output:


> my.data.frame <- data.frame(states.lived, sapply(states.lived, GetCapital), sapply(states.lived, GetAdmissionDate), row.names = c(1:length(states.lived)), stringsAsFactors = FALSE)
> colnames(my.data.frame) <- c("State code", "Capital", "Admission Date")
> my.data.frame
  State code    Capital Admission Date
1         CA Sacramento           1850
2         KY  Frankfort           1792
3         MD  Annapolis           1788
4         VA   Richmond           1788
5         CO     Denver           1876

This has admittedly been a really brief introduction to the creation and population of R data frames, but I hope I got across the main points I wanted to present, which was that you would do better to look at creating a data structure in the context of the vector/list-oriented functions typically used in R to process collections of data.  Try to avoid ever using an explicit iterator (e.g. a for-loop) over a collection, and you'll be off to a good start.

Accessing elements in data frames

This was another area I felt was slightly tricky, coming from a different programming language.  There's more going on here than just accessing individual elements -- there's also the concept of subsetting a data frame, and there are two ways to subset a data frame.  I will cover this fairly broadly, but hopefully it will be enough information to save someone a few of the misunderstandings I had on my first exposure.

First, bracket notation...  In the data frame I just created above, my.data.frame, I can get the capital of Kentucky by accessing element 2 of row 2 (note: indices start from 1, not 0, in R):

> my.data.frame[2,2]
[1] "Frankfort"

Another interesting thing you can do with a data frame is to say "give me column 2; the row doesn't matter".  You do this with the same notation, except that you neglect to specify the row:


> my.data.frame[,2]
[1] "Sacramento" "Frankfort"  "Annapolis"  "Richmond"   "Denver" 

A similar approach applies to getting a row:

> my.data.frame[3,]
  State code   Capital Admission Date
3         MD Annapolis           1788

Where things start to get really "functional" is when you realize you can pass in sequences of indices, or logical tests.  For example, if I wanted to get only the state code and the date of admission for California and Colorado, I would execute the following:


> my.data.frame[c(1,5),c(1,3)]
  State code Admission Date
1         CA           1850
5         CO           1876

Here I've specified rows 1 and 5 and columns 1 and 3.  If I wanted the first 3 rows, I could have specified the vector c(1:3).  In fact, in the case of a range, I don't even need the vector.  To return rows 2 through 4 and columns 2 through 3, I could say:

> my.data.frame[2:4,2:3]
    Capital Admission Date
2 Frankfort           1792
3 Annapolis           1788
4  Richmond           1788

We are not restricted to specifying just indices; we can also subset with the square-bracket operator using logical expressions.  For example, if I wanted to  extract the state code and date of admission of states entering the union after 1800, I could do the following:

> my.data.frame[my.data.frame$"Admission Date" > 1800, 2:3]
     Capital Admission Date
1 Sacramento           1850
5     Denver           1876

Since I inserted a space in the name of the third column, I have to quote it to ensure I don't confuse subset.

R also has a function called subset().  Look at its documentation with ?subset, but the general form we're going to use here is

subset(dataFrame, logicalExpression, columnsToExtract)

Using subset, I could extract only the state code and date of admission of all states in the data frame which entered the union before 1800:


> subset(my.data.frame, my.data.frame$"Admission Date"<1800, c("State code", "Admission Date"))
  State code Admission Date
2         KY           1792
3         MD           1788
4         VA           1788

Note that the parameters in subset are named, so if you merely want to select some columns and don't want to provide a logical test, you don't have to worry about the missing "subset" argument, as long as you name the "select" argument.  In other words,

> subset(my.data.frame, select = c("State code", "Admission Date"))
  State code Admission Date
1         CA           1850
2         KY           1792
3         MD           1788
4         VA           1788
5         CO           1876

works, while 


> subset(my.data.frame, c("State code", "Admission Date"))
Error in subset.data.frame(my.data.frame, c("State code", "Admission Date")) : 
  'subset' must evaluate to logical

does not, because subset thinks that you are trying to pass a character vector as a logical expression (that is, to the "subset" argument)


There is a lot you can do with the square-bracket operator and the subset function; I've only touched on the possibilities here.  I recommend you read the R documentation pages and look at a couple of references, online or otherwise, to become more familiar.  If you're going to work with data frames, you really need to know how to get at the data once you've created one!

Hadoop/R Integration: Streaming

If you've spent any time with MapReduce frameworks in general, by now you probably know the word-count example is the MapReduce equivalent of "Hello World!".  In earlier posts, I tried to be slightly different, but with an equally-simple problem - counting up the total dollar value, by state, of new issues of mortgage-backed securities pooled by Fannie Mae.

I have used "straight" Java and Pig so far, and now I'll turn my attention to R.  After our example, we'll discuss what makes R unique in this situation, and why a word-count type of example doesn't really do R justice.  In advance, I'll mention my main references used here are Joseph Adler's R in a Nutshell (see chapter 26) and Tom White's Hadoop: The Definitive Guide (Chapter 2).

There are a number of ways to use R with Hadoop, including:
  • Hadoop streaming, the subject of this post
  • RHadoop, an R/Hadoop integration (see the RHadoop Wiki), the subject of a future post
  •  RHIPE (pronounced hree-pay), another R/Hadoop integration.
Because of the breadth of topics I'm trying to cover in this blog, I'm going to restrict myself to streaming and RHadoop for this topic.

Overview

In Hadoop streaming, your mapper, reducer, and optional combiner processes are written to read from standard input and to write to standard output.  Once the process scripts and data are ready, you simply invoke Hadoop using its streaming binaries along with some command-line properties.

As in previous posts, I'll be taking data from Fannie Mae's New Issue Pool Statistics (NIPS) files.  For more info, see a previous post.  I'll be using the same data as in that post, so we can expect an exact match on the results.

The Mapper

NIPS files are a little interesting, in that they contain a number of differently-formatted records (check here for the formats).  To perform our analysis, we will be looking at record type 9, "GEOGRAPHIC DISTRIBUTION".  We will be interested in columns 3 (state name) and 6 (aggregate unpaid balance).  Since numerous record formats are mixed within a single file, we'll first split the file on the pipe delimiters and discard the records that are not of type 9.  All we need to do is output the state name and the aggregate unpaid balance, one instance per type-9 line.

To develop my R scripts, I'm using RStudio, an IDE I learned of through Roger Peng's Computing for Data Analysiscourse on coursera.  After an interactive script-building session in RStudio, I produced the following test script:

#! /usr/bin/env Rscript

conn <- file("/home/hduser/fannie-mae-nips/nips_12262012.txt", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  split.line <- strsplit(next.line, "\\|")
  if (as.numeric(split.line[[1]][2]) == 9) {
    write(paste(split.line[[1]][3],
                gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
  }
}
close(conn)

which I then invoked from a shell and got the following output, truncated:

CALIFORNIA    167300.00
FLORIDA    395950.00
GEORGIA    69500.00
ILLINOIS    235200.00
MICHIGAN    781950.00
NEW JERSEY    284550.00
OHIO    334175.00


Since this looks clean, I modified the mapper slightly to produce the final version:

#! /usr/bin/env Rscript

conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  split.line <- strsplit(next.line, "\\|")
  if (as.numeric(split.line[[1]][2]) == 9) {
    write(paste(split.line[[1]][3],
                gsub("[$,]", "", split.line[[1]][6]), sep="\t"), stdout())
  }
}
close(conn)


Note the interesting subscripting to get the results of strsplit:  strsplit returns a list, so field 2 of the file record is actually element 2 of the first element of the list, which is a vector of parsed fields.  If you need some clarification of this result, see the "Subscripting" chapter from Phil Spector's Data Manipulation with R.  Also note the compact usage ofgsub to remove the dollar signs and commas from the aggregate unpaid balances.

The Reducer

Our reducer will also be reading from stdin, with the following guaranteed by the Hadoop runtime:
  • If a key is encountered by the reducer, then the reducer knows that all records with that key are being sent to this reducer, so it can produce an output with the knowledge that it has been given all the records for that key;
  • Incoming records are sorted by key, so the reducer knows that, when a key changes, then all records for the previous key have already been encountered in the stream.
In our reducer, we will have a couple of variables: one to keep track of which key is being processed, and one to hold the running total of aggregate unpaid balances for mortgages from a given state.  Once a key changes, we will output the current balance running total (with its key) and reset the running balance:

#! /usr/bin/env Rscript

current.key <- NA
current.upb <- 0.0

conn <- file("stdin", open="r")
while (length(next.line <- readLines(conn, n=1)) > 0) {
  split.line <- strsplit(next.line, "\t")
  key <- split.line[[1]][1]
  upb <- as.numeric(split.line[[1]][2])
  if (is.na(current.key)) {
    current.key <- key
    current.upb <- upb
  }
  else {
    if (current.key == key) {
      current.upb <- current.upb + upb
    }
    else {
      write(paste(current.key, current.upb, sep="\t"), stdout())
      current.key <- key
      current.upb <- upb
    }
  }
}
write(paste(current.key, current.upb, sep="\t"), stdout())
close(conn)

Now, I'd like to test this reducer on a single file, but I have a small issue -- my mapper does not sort the output (it doesn't need to, of course), but my reducer expects the data to be sorted by key.  I could just wait and see how the final numbers come out, but since streaming just involves piping stdout to stdin, I'm a little curious about how fast this task could be run outside of Hadoop (I'm not really comparing, for a simple single-node cluster; I'm just curious).  And I'm still learning R, so I next write a script to sort the rows by record key:

#! /usr/bin/env Rscript

conn <- file("stdin", open="r")
all.lines <- readLines(conn)
write(sort(all.lines), stdout())
close(conn)


At times like this, I remember why I like R so much!  Next, I process a single file with my "test" version of the mapper:

./map.test.R | ./map.output.sorter.R | ./reduce.R

and get output like the following (abbreviated) for a single NIPS file:

ALABAMA 72699735.21
ALASKA  6883209.62
ARIZONA 287482321.1
ARKANSAS        21579003.98
CALIFORNIA      1811342276.77

...
VIRGIN ISLANDS  1021750
WASHINGTON      239648997.97
WEST VIRGINIA   9925894.94
WISCONSIN       72752945.87
WYOMING 6232557.56


Streaming in Hadoop with R

Now that we have a mapper and a reducer, we can process the entire data set in Hadoop.  I will process the same set of data as I did in my previous Hadoop-Java-Pig comparison post, meaning NIPS data from 23 August to 26 December 2012.  As in that post, I am running Hadoop in pseudo-distributed mode, with the data coming from HDFS.  The difference here, of course, is that I am specifying streaming, and providing my mapper and reducer R scripts.  I launch from the Hadoop home directory:

bin/hadoop jar $HADOOP_PREFIX/contrib/streaming/hadoop-streaming-1.1.0.jar -input /user/hduser/fannie-mae-nips -output /user/hduser/fannie-mae-nips-r-output -mapper /home/hduser/RScripts/map.R -reducer /home/hduser/RScripts/reduce.R

So, what did I get for my efforts?  Copying my results file from HDFS:

bin/hadoop dfs -copyToLocal /user/hduser/fannie-mae-nips-r-output/part-00000 rResults.txt

yields the following output (abbreviated here):

ALABAMA 3242681838.89999
ALASKA  841797447.200001
ARIZONA 9340767235.06001
ARKANSAS        1452136751.9
CALIFORNIA      89114642822.0799
...

VERMONT 553060435.67
VIRGIN ISLANDS  33604327.46
VIRGINIA        12706719836.48
WASHINGTON      13194248475.54
WEST VIRGINIA   486889587.57
WISCONSIN       8140391871.79
WYOMING 720905726.84


I still have the outputs from my previous post on this same data set, using Java and Pig; perusing this output shows the following output (note I did not diff the files because the numbers were output in a different format):

ALABAMA 3.242681838899994E9
ALASKA  8.417974472000003E8
ARIZONA 9.340767235060005E9
ARKANSAS        1.452136751900001E9
CALIFORNIA      8.91146428220799E10

....
VERMONT 5.530604356700001E8
VIRGIN ISLANDS  3.360432746000001E7
VIRGINIA        1.2706719836479996E10
WASHINGTON      1.319424847554002E10
WEST VIRGINIA   4.868895875700002E8
WISCONSIN       8.140391871790002E9
WYOMING 7.209057268400007E8


So, I successfully duplicated the Java and Pig examples using R and Hadoop streaming.

Final Comments about Hadoop and R

If you are at all familiar with R, you understand that R isn't a language you pick up just to split lines of output and sum numbers; the language and its libraries contain a wealth of functionality.  The point of this post was primarily to work through the mechanical details of using R with Hadoop streaming.  Where R would really shine is if we had some "heavy lifting" to do with R that was easily decomposable into map-style and reduce-style tasks.  For example, if you were fitting a linear regression against a huge data set, using a large number of variables, or if you were performing a Shapiro-Wilk test against a large data set, the ability to split up the job into parallel tasks, combining them at the end with a reducer, would be a great example of Hadoop/R synergy.  For more information on parallel computation in R, see chapter 26 of Joseph Adler's R in a Nutshell, especially his "Where to Learn More" section at the end of the chapter.
Related Posts Plugin for WordPress, Blogger...