Showing posts with label Pig. Show all posts
Showing posts with label Pig. Show all posts

Tuesday 21 October 2014

Hadoop MapReduce -- A Java vs Pig Example

In a recent post, I used Pig to analyze some MBS (mortgage-backed security) new-issue pool data from Fannie Mae.  At the time, I noted a number of errors processing the files in my data set, and now I'm going to go back and debug the job.  My goal is to clean up my parsing/processing logic sufficiently to get most, if not all, of the records included in my data.  Whether this is important or not depends on the task, of course.  If you are trying to reconcile records between two systems (e.g. to find lost revenue in a telecom billing system), you'd prefer not to lose any records.  If you are examining polling data to predict an election, you can probably afford to lose a few records.

I'm going to cheat a little, since I feel it's a little easier to debug in a Java MapReduce application than in Pig.  So I'm going to go back to Java, examine the data in a little more detail there, and then see what I could have done in Pig, had that been my only option.  I'll use the "new" MapReduce API available since Hadoop 0.20, referred to as "Context Objects".

To refresh your memory or to save you the trouble of reading my previous post, I'm looking through the files that Fannie Mae makes available after each business day, showing the new issues of mortgage pools from the day before.  I had processed the files to determine the total amount of unpaid balance (UPB) on a per-state basis.  Another important point I will repeat here is that these files contain records of several different types (and widths); you determine the record type by looking at column #2 and then referring to a file that contains the description for that record type.

Since my last post on this topic, a few more NIPS (New Issue Pool Statistics) files have been made available.  I import all of them into HDFS with

hadoop dfs -copyFromLocal /home/hduser/fannie-mae-nips /user/hduser

This set gives me data from 23 August to 26 December 2012.

 The Java MapReduce Job

I write a mapper and reducer, the main details of which follow.  First, my (Maven) project:

 mvn archetype:generate -DgroupId=com.example.fannieMaeNips
-DartifactId=fannie-mae-nips -DarchetypeArtifactId=maven-archetype-quickstart
-DinteractiveMode=false


Next, edit the pom.xml file and add the Hadoop dependency:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.1.0</version>
</dependency>


I drill down into the Java source directory, delete the Maven-supplied sample class, and create NipsMapReduce.java.  The new MapReduce API is found in the org.apache.hadoop.mapreduce package, rather than the oldorg.apache.hadoop.mapred.  I mention this again because the similarities between the classes in the two packages can lead to some odd errors at compile (or run) time if you use one package and "write to the other."  For more details, see online resources for Hadoop or Tom White's Hadoop:  The Definitive Guide.  

Here is the full source code for the mapper and reducer:

package com.example.fannieMaeNips;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NipsMapReduce
{

  static class NipsMapReduceMapper
    extends Mapper<LongWritable, Text, Text, DoubleWritable> {
    private Text state = new Text();
    public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
      // the default InputFormat for Mapper is TextInputFormat,
      // which splits a file into lines.
      // here, the key is the offset into the file;
      // value is the value of the current line of text:
      String nextLine = value.toString();
      // note the argument to java.lang.String.split() is a regex:
      String fields[] = nextLine.split("\\|");
      // are we looking at record type = 9 (Geographic Distribution)?
      if (fields != null && fields.length == 6 && Integer.parseInt(fields[1]) == 9) {
        // set the key to the name of the state:
        state.set(fields[2]);
        // parse the Aggregate Unpaid Balance field
        // "$ddd,ddd,ddd.dd" format into a double:
        double aggUPB =
          Double.parseDouble(fields[5].substring(1).replace(",", ""));
System.out.println("Adding upb " + aggUPB + " (" + fields[5] + ") to state " + fields[2]);
        // write the state and the aggregate unpaid balance
        // out to the Context:
        context.write(state, new DoubleWritable(aggUPB));
      }
    }
  }
  static class NipsMapReduceReducer
    extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
    public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
      throws IOException, InterruptedException {
      double sum = 0.0;
      for (DoubleWritable value: values) {
System.out.println("Summing " + value.get() + " into state '" + key + "'");
        sum += value.get();
      }
      context.write(key, new DoubleWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception
  {
    if (args.length != 2) {
      System.err.println("Usage: NipsMapReduce <input path> <output path>");
      System.exit(-1);
    }
    Job job = new Job();
    job.setJarByClass(NipsMapReduce.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setMapperClass(NipsMapReduceMapper.class);
    job.setReducerClass(NipsMapReduceReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);
    System.exit(job.waitForCompletion(true)? 0:1);
  }
}
 


I've added a couple of comments to help, plus I put in some output to stdout to check in the log files after the job runs.  After I build the class, I export the path to the jarfile with HADOOP_CLASSPATH:

export HADOOP_CLASSPATH=/home/hduser/dev/fannie-mae-nips/target/fannie-mae-nips-1.0-SNAPSHOT.jar

At which point, we can run:

bin/hadoop com.example.fannieMaeNips.NipsMapReduce /user/hduser/fannie-mae-nips /user/hduser/fannie-mae-nips-output

Hadoop is running in pseudo-distributed node, so the input and output paths are in HDFS.

There's a fair amount of output to stdout, generated by Hadoop, during the run.  My output to stdout is captured in the Hadoop logs.  Somewhere near the end of the output, we see the following:

12/12/27 16:17:16 INFO mapred.JobClient:     Reduce input groups=54
This number includes the 50 states, plus the District of Columbia, Puerto Rico, Guam and the U.S. Virgin Islands.

If I look at the Hadoop log files, I will see my output to stdout in the "stdout" files for each "attempt".  To see the "stdout" and "stderr" files, I switched to the userlogs output directory for this job:

cd $HADOOP_PREFIX/logs/userlogs/job_201212271420_0001
 

Here's some example output of my mapper, followed by sample output from my reducer:

...
Adding upb 339366.0 ($339,366.00) to state ILLINOIS
Adding upb 679920.0 ($679,920.00) to state KENTUCKY
Adding upb 440944.88 ($440,944.88) to state NORTH CAROLINA
Adding upb 624000.0 ($624,000.00) to state OHIO
Adding upb 194238.0 ($194,238.00) to state SOUTH CAROLINA
...

...
Summing 988363.58 into state 'MASSACHUSETTS'
Summing 372349.79 into state 'MASSACHUSETTS'
Summing 2015439.31 into state 'MASSACHUSETTS'
Summing 1237239.1 into state 'MASSACHUSETTS'
Summing 250000.0 into state 'MASSACHUSETTS'
...

 

Note how the mapper is processing states as they appear in the NIPS files, while this particular reducer has been given all the values for a particular key ("MASSACHUSETTS") as a group.  I also notice that all of my "stderr" files are empty, leading me to believe that I had no parse errors.  The directories of each attempt are links, so to recursively list them and grep for the stderr entries, I issued the followed command:

ls -lLR | grep stderr | less

which gives me a stream of output like the following:

...
-rw-r--r-- 1 hduser hadoop     0 Dec 27 16:16 stderr
-rw-r--r-- 1 hduser hadoop     0 Dec 27 16:16 stderr
-rw-r--r-- 1 hduser hadoop    0 Dec 27 16:17 stderr
-rw-r--r-- 1 hduser hadoop    0 Dec 27 16:12 stderr
-rw-r--r-- 1 hduser hadoop       0 Dec 27 16:12 stderr

... 

So it appears the Java MapReduce application parsed all the files with no errors.  This result makes me curious to try my earlier post's Pig queries against the same data set. 


Revisiting the Pig Query Example

From my earlier post, I retrieve and modify my Pig script to point to the current data set:


nips = load '/user/hduser/fannie-mae-nips' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr = filter nips by (recordType == 9);
fr_clean = foreach fr generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, '\\\$', ''), ',', '') as upbFloat;
byState = group fr_clean by state;
totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0 as total;
sortedUpb = order totalUpb by total;
dump sortedUpb;


Then, I run it with:

pig -f nipsDump.pig &> nipsDumpPig.log

First, let's compare a few values.  For California, my Java MapReduce program returned the following total:


bin/hadoop dfs -cat /user/hduser/fannie-mae-nips-output/part-r-00000 | grep CALIFORNIA
CALIFORNIA 8.911464282207979E10

while the Pig output, scaled by 10^6 (dollars), is


(CALIFORNIA,89114.64280685547)

In other words, identical.  Going back to the Pig log file, I see

2012-12-28 11:36:18,471 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 3348 time(s).
2012-12-28 11:36:18,476 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 41331 time(s).

and yet, after checking all 54 outputs of both jobs, they are identical.  And there were no errors reported in the Java MapReduce program.  A false alarm, in other words, due to my inexperience with Pig.

A quick Internet search shows that I have likely run into an issue related to the unique nature of the records in the NIPS files, namely, that they contain records of several types (and widths).  My issue probably occurs since my Pig parse command attempts to grab the first 6 elements of every row, and not every row has 6 elements.  This issue doesn't occur in the Java MapReduce program because I check the length of the array into which my line has been split before processing it.  This failure only generates a warning, of course, and so does not affect the results of the Pig query.

To test to see if this really is the case, I add an additional counter to the Java MapReduce to count the number of times that a row has less than 6 elements.  Since my mapper is outputting DoubleWritables, I increment the key's value by 1.0 on each hit to keep things simple.  It should be easy to find the "non existent field" warning.  Looking at Fannie Mae's NIPS file format, it appears there are 3 records types with fewer than 6 columns, and hopefully for this file set they will add up to 3348.

Finding the type-conversion warnings might be a little more challenging, and time-consuming (especially since this is mostly for my own amusement) -- I would need to check every non-interesting line to see if the fields don't match my expected types.  For example, for record type '01', column 5 is a character string, which would cause a failure, since I try to parse it as a float.  I think I will just check for the "fewer than 6 columns" issue as a sanity check.



Adding the following line to the mapper of my Java MapReduce program, after the "if" statement evaluating the number of columns:


      else if (fields != null && fields.length < 6) {
        state.set("ShortRecords");
        context.write(state, new DoubleWritable(1.0));
      }


and re-running with:

export HADOOP_CLASSPATH=/home/hduser/dev/fannie-mae-nips/target/fannie-mae-nips-1.0-SNAPSHOT.jar
bin/hadoop com.example.fannieMaeNips.NipsMapReduce /user/hduser/fannie-mae-nips /user/hduser/fannie-mae-nips-output2

I get the following output:


SOUTH DAKOTA    1.0024923021099998E9
ShortRecords    3348.0
TENNESSEE       4.426305956350002E9

right in order, as expected, between South Dakota and Tennessee.

When I started this post, it was intended to be about debugging MapReduce jobs.  That would be a nice post, although not very original.  It ended up being a post comparing Java MapReduce with Pig MapReduce; hopefully it was interesting.  If nothing else it shows how compact a Pig query can be, even though the Java code wasn't very verbose either, for this admittedly simple case.

Apache Pig -- Beginnings

I've been experimenting with using Pig on some Fannie-Mae MBS data lately.  While I don't mind writing MapReduce programs to process data (especially the fairly simple tasks I'm doing now), I really do appreciate the "magic" Pig does under the blanket, you might say.  If you don't know, Pig, a member of the Hadoop ecosystem (and now a first-class Apache project at pig.apache.org), is a framework for analyzing large data sets.  In this mini-tutorial we'll see how Pig works with Hadoop and HDFS, and just how much you can accomplish with only a few lines of script.  I am using Pig version 0.10.0 on Hadoop 1.1.0 (on Ubuntu 12.04, on VirtualBox 4.2.4, on Windows 7SP1, on the third floor of a tri-level at 1728 m above sea level, but that could change -- see this story about another "PIG").

I'll assume in this tutorial that you have Hadoop and Pig installed and that you are running Hadoop at least in pseudo-distributed mode.  If you're really fresh to both topics, I would recommend first looking at their respective Apache websites, and for getting Hadoop deployed and running, it really doesn't get any better than Michael Noll's posts on the subject.  For a single-node cluster (which is sufficient for following this tutorial), see his post Running Hadoop On Ubuntu Linux (Single-Node Cluster).  I am reading Tom White's Hadoop:  The Definitive Guide, which contains a very useful chapter on Pig.

First, the dataset

Before we start, let's look at the data we'll be parsing.  On the Fannie Mae website, you can find a page of the most-recent mortgage-pool issues (click on "New Issues Statistics).  Pipe-delimited files are available for each day for which issue data is available.  On this page, I'm most interested in the New Issue Pool Statistics, which I'll abbreviate NIPS.  These files are interesting in that they contain records in several different formats (a link on the above page refers to a document that describes the various record formats found in a NIPS file).  So, as you parse a NIPS file, you need to look at the 2nd column of data first, then refer to the file-format description file, to interpret the data.

As an example, I'm looking at the last few lines of the 9 November NIPS file.  I've included only the lines for one CUSIP, AQ7340:

AQ7340|01|3138MPEN1|11/01/2012|FNMS 02.5000 CI-AQ7340|$2,218,111.00|2.5||12/25/2012|U.S. BANK N.A.|U.S. BANK N.A.|9|247169.44|11/01/2027|||||||3.092|||1|180|179|92|779|0.0||0.0|CI  ||76.17|97AQ7340|02|MAX|375000.0|3.5|94.0|813|180|2|180
AQ7340|02|75%|317250.0|3.25|93.0|796|180|1|180
AQ7340|02|MED|241800.0|3.0|92.0|786|180|0|180
AQ7340|02|25%|209725.0|3.0|91.0|773|180|0|179
AQ7340|02|MIN|179500.0|2.875|90.0|697|180|0|178
AQ7340|03|REFINANCE|9|100.0|$2,218,111.30
AQ7340|04|1|9|100.0|$2,218,111.30
AQ7340|05|PRINCIPAL RESIDENCE|9|100.0|$2,218,111.30
AQ7340|08|2012|9|100.0|$2,218,111.30
AQ7340|09|GEORGIA|1|8.98|$199,118.84
AQ7340|09|ILLINOIS|1|9.52|$211,250.00
AQ7340|09|MICHIGAN|2|19.13|$424,312.98
AQ7340|09|MINNESOTA|2|24.93|$552,916.34
AQ7340|09|MISSOURI|1|10.82|$239,984.67
AQ7340|09|WASHINGTON|2|26.62|$590,528.47
AQ7340|10|U.S. BANK N.A.|9|100.0|$2,218,111.30
AQ7340|17|BROKER|1|16.91|$375,000.00
AQ7340|17|CORRESPONDENT|6|59.27|$1,314,611.30
AQ7340|17|RETAIL|2|23.83|$528,500.00

In this tutorial, we will be processing NIPS files to add up the total unpaid balances (UPBs) totaled on a per-state basis. Referring to the NIPS file layout description, I see I need to look at records where field #2 is "09". What we will want to do with this data is to accumulate the dollar amount of each UPB into a "state" key, over an entire NIPS file or set of NIPS files, and output the totals by state when we're done.

These are not huge datasets, of course.  But, for the purpose of creating an interesting tutorial, we'll process a small amount of NIPS data and see which states are seeing the most mortgage activity (at least as far as Fannie-Mae new issues are concerned).  The main point here is learning how to process the data and leverage the capabilities of Pig.

Loading the data into HDFS

I will start by downloading all of the available data on the Fannie Mae NIPS page to my local filesystem.  At the time of this tutorial, this included data from the 23rd of August (2012) through the 21st of November.  This set provides just a little under 400K lines of output.  The next step is to copy from my local storage to HDFS:


$ bin/hadoop dfs -copyFromLocal /home/hduser/dev/pigExamples/nipsData /user/hduser/pigExample

We can verify the transfer to HDFS with:

$ bin/hadoop dfs -ls /user/hduser/pigExample

Examining a single file with Pig

We're going to start by loading a single file and attempting to filter out lines whose record type is not "09".  I'm assuming you have installed Pig and it is configured to access HDFS.  Start the Pig interpreter:

hduser@ubuntu:~$ pig
2012-11-24 16:56:19,305 [main] INFO  org.apache.pig.Main - Apache Pig version 0.10.0 (r1328203) compiled Apr 19 2012, 22:54:12
2012-11-24 16:56:19,306 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/hduser/pig_1353801379300.log
2012-11-24 16:56:19,518 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000
2012-11-24 16:56:19,858 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001
grunt&gt; 

You can see from the output that Pig knows I'm running Hadoop in (pseudo-)distributed mode.  If you don't see these, verify your PIG_CLASSPATH is set.  Next I'm going to load a single file from HDFS into Pig.  Pig assumes the field delimiter is a tab; since our file is pipe ("|") delimited, we will use PigStorage to override the default:

grunt&gt; NIPS_9Nov = load 'pigExample/nips_11092012.txt' using PigStorage('|');

(Note that the path is relative to '/user/hduser').  This load will not occur until the data is required; for example, right now a "dump" would cause the file to be loaded.  In fact, if you type

grunt&gt; dump NIPS_9Nov;

you will see a flurry of activity, related to the Hadoop MapReduce task(s) being created on your behalf, culminating with the actual output of the parsed-on-pipe-symbol text, of which the last few lines look like the following:

(AQ7340,09,GEORGIA,1,8.98,$199,118.84)
(AQ7340,09,ILLINOIS,1,9.52,$211,250.00)
(AQ7340,09,MICHIGAN,2,19.13,$424,312.98)
(AQ7340,09,MINNESOTA,2,24.93,$552,916.34)
(AQ7340,09,MISSOURI,1,10.82,$239,984.67)
(AQ7340,09,WASHINGTON,2,26.62,$590,528.47)
(AQ7340,10,U.S. BANK N.A.,9,100.0,$2,218,111.30)
(AQ7340,17,BROKER,1,16.91,$375,000.00)
(AQ7340,17,CORRESPONDENT,6,59.27,$1,314,611.30)
(AQ7340,17,RETAIL,2,23.83,$528,500.00)

This is good; this is what we want.  Next we'll want to look only at the record-type=09 fields, then accumulate balances on a per-state level.  In a fresh Pig shell, enter the following:

grunt&gt; nips_9nov = load '/user/hduser/pigExample/nips_11092012.txt' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
grunt&gt; fr_9nov = filter nips_9nov by (recordType == 9);
grunt&gt; dump fr_9nov;

This will produce the same output as before, only restricting it to the "record type = 09" fields.  Again, here is the tail of this output:

(AQ7337,9,WYOMING,1,2.7,$170,619.65)
(AQ7340,9,GEORGIA,1,8.98,$199,118.84)
(AQ7340,9,ILLINOIS,1,9.52,$211,250.00)
(AQ7340,9,MICHIGAN,2,19.13,$424,312.98)
(AQ7340,9,MINNESOTA,2,24.93,$552,916.34)
(AQ7340,9,MISSOURI,1,10.82,$239,984.67)
(AQ7340,9,WASHINGTON,2,26.62,$590,528.47)

At this point I'm going to change direction a little and put the Pig statements into a script, so it is a little easier to catch the output.  Create a new file called "pigTest.pig" and add the following lines to it:

nips_9nov = load '/user/hduser/pigExample/nips_11092012.txt' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr_9nov = filter nips_9nov by (recordType == 9);
dump fr_9nov;

Save the file and invoke it with:

pig -f pigTest.pig &amp;&gt; pigTest.log

Some of Pig's output goes to stderr, so you'll want to capture both stdout and stderr to your log file.  Open the log file and scroll down to:

 Job Stats (time in seconds):

and look at the next two lines, the first of which is a header.  Note that Pig only generated a Map job and no Reduce jobs (Maps = 1, Reduces = 0, Feature = "MAP_ONLY").  Since we are only loading records and filtering them based on a field characteristic, no Reduce job was necessary.

Next, we'll want to parse the aggregate unpaid balances for each mortgage, sum them by state, and output the totals.  The aggregate UPB is in the form of a human-readable, not-much-fun-to-parse bytearray (e.g. $3,759,464.16).  To treat these as floats we'll have to do a little cleanup.  This may not be terribly efficient, but I used a nested "REPLACE" function call:

fr_clean = foreach fr_9nov generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, '\$', ''), ',', '') as upbFloat;

Note that if you enter this expression in the Pig shell, you'll need two additional escape ("\") characters in front of the dollar sign (which, as in the java.lang.String.replaceAll() method, is interpreted as a regex). In a script, you'll need to escape both the dollar sign and the backslash.  Trust me.  fr_clean will now contain cleaned-up unpaid balances that look like real floats.  In the Pig shell, you can verify the schema of the relation (but not that the data will parse, as this has not happened yet) with the following:

grunt&gt; describe fr_clean;
2012-11-26 23:21:45,570 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_CHARARRAY 1 time(s).
2012-11-26 23:21:45,570 [main] WARN  org.apache.pig.PigServer - Encountered Warning USING_OVERLOADED_FUNCTION 1 time(s).
fr_clean: {poolNumber: bytearray,state: bytearray,numberOfLoans: int,percentageUpb: float,upbFloat: float}

The final steps to output the states (and the District of Columbia) with the total unpaid balances of all new issues (for this file, in millions of dollars) are:

grunt&gt; byState = group fr_clean by state;
grunt&gt; totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0;
grunt&gt; dump totalUpb;

I've glossed over these steps, but basically you are grouping by state and summing the unpaid balances on a per-state basis, scaling the totals by one million.  After the dump call is completed, we get 51 lines of output, the last few of which are here:

...
(CALIFORNIA,1021.2734624101563)
(NEW JERSEY,103.9833925234375)
(NEW MEXICO,18.8126310078125)
(WASHINGTON,153.9220293671875)
(CONNECTICUT,33.7019688515625)
(MISSISSIPPI,24.3124981796875)
(NORTH DAKOTA,7.280279875)
(PENNSYLVANIA,147.6614224453125)
(RHODE ISLAND,15.24327924609375)
(SOUTH DAKOTA,10.51592517578125)
(MASSACHUSETTS,156.1397877109375)
(NEW HAMPSHIRE,16.52243540234375)
(WEST VIRGINIA,8.3394678828125)
(NORTH CAROLINA,129.42278906640624)
(SOUTH CAROLINA,70.23617646875)
(DISTRICT OF COLUMBIA,18.288814109375)

In other words, California totaled slightly more than one billion dollars for the pools issued on the 9th of November in 2012.  

To wrap things up a little, I'll next run from a Pig script file.  I mentioned earlier we need to be a little careful about the escape character in the "REPLACE" call.  Here's the script to process a single file:

nips_9nov = load '/user/hduser/pigExample/nips_11092012.txt' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr_9nov = filter nips_9nov by (recordType == 9);
fr_clean = foreach fr_9nov generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, '\\\$', ''), ',', '') as upbFloat;
byState = group fr_clean by state;
totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0;
dump totalUpb;

Processing the entire dataset

There's not much left to do here but run against the entire dataset, which in our case is about three months' worth of new-issues files.  A slight modification to the script:

nips = load '/user/hduser/pigExample' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr = filter nips by (recordType == 9);
fr_clean = foreach fr generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, '\\\$', ''), ',', '') as upbFloat;
byState = group fr_clean by state;
totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0 as total;
sortedUpb = order totalUpb by total;
dump sortedUpb;

results in similar data (sorted in ascending order of total aggregate UPB), of course with only larger numbers.  For example, we see that during a three-month period starting in late August 2012, new Fannie-Mae pools representing $629M were issued for properties in Alaska.  You can also see from the output file that one Map and one Reduce job were created, and I have to admit, quite a number of records dropped (due to failure to parse):

2012-11-26 23:51:38,269 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 2396 time(s).
2012-11-26 23:51:38,269 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 30869 time(s).

On first inspection, it appears that 2396 "record type = 9" records actually didn't have enough fields to provide an aggregate unpaid balance column, and that I failed to successfully convert quite a few balances.  I did not investigate these records; however, such records generally tell you that you need to modify your parse logic.  In other words -- a good topic for another post!
Related Posts Plugin for WordPress, Blogger...