Tuesday 21 October 2014

Apache Pig -- Beginnings

I've been experimenting with using Pig on some Fannie-Mae MBS data lately.  While I don't mind writing MapReduce programs to process data (especially the fairly simple tasks I'm doing now), I really do appreciate the "magic" Pig does under the blanket, you might say.  If you don't know, Pig, a member of the Hadoop ecosystem (and now a first-class Apache project at pig.apache.org), is a framework for analyzing large data sets.  In this mini-tutorial we'll see how Pig works with Hadoop and HDFS, and just how much you can accomplish with only a few lines of script.  I am using Pig version 0.10.0 on Hadoop 1.1.0 (on Ubuntu 12.04, on VirtualBox 4.2.4, on Windows 7SP1, on the third floor of a tri-level at 1728 m above sea level, but that could change -- see this story about another "PIG").

I'll assume in this tutorial that you have Hadoop and Pig installed and that you are running Hadoop at least in pseudo-distributed mode.  If you're really fresh to both topics, I would recommend first looking at their respective Apache websites, and for getting Hadoop deployed and running, it really doesn't get any better than Michael Noll's posts on the subject.  For a single-node cluster (which is sufficient for following this tutorial), see his post Running Hadoop On Ubuntu Linux (Single-Node Cluster).  I am reading Tom White's Hadoop:  The Definitive Guide, which contains a very useful chapter on Pig.

First, the dataset

Before we start, let's look at the data we'll be parsing.  On the Fannie Mae website, you can find a page of the most-recent mortgage-pool issues (click on "New Issues Statistics).  Pipe-delimited files are available for each day for which issue data is available.  On this page, I'm most interested in the New Issue Pool Statistics, which I'll abbreviate NIPS.  These files are interesting in that they contain records in several different formats (a link on the above page refers to a document that describes the various record formats found in a NIPS file).  So, as you parse a NIPS file, you need to look at the 2nd column of data first, then refer to the file-format description file, to interpret the data.

As an example, I'm looking at the last few lines of the 9 November NIPS file.  I've included only the lines for one CUSIP, AQ7340:

AQ7340|01|3138MPEN1|11/01/2012|FNMS 02.5000 CI-AQ7340|$2,218,111.00|2.5||12/25/2012|U.S. BANK N.A.|U.S. BANK N.A.|9|247169.44|11/01/2027|||||||3.092|||1|180|179|92|779|0.0||0.0|CI  ||76.17|97AQ7340|02|MAX|375000.0|3.5|94.0|813|180|2|180
AQ7340|02|75%|317250.0|3.25|93.0|796|180|1|180
AQ7340|02|MED|241800.0|3.0|92.0|786|180|0|180
AQ7340|02|25%|209725.0|3.0|91.0|773|180|0|179
AQ7340|02|MIN|179500.0|2.875|90.0|697|180|0|178
AQ7340|03|REFINANCE|9|100.0|$2,218,111.30
AQ7340|04|1|9|100.0|$2,218,111.30
AQ7340|05|PRINCIPAL RESIDENCE|9|100.0|$2,218,111.30
AQ7340|08|2012|9|100.0|$2,218,111.30
AQ7340|09|GEORGIA|1|8.98|$199,118.84
AQ7340|09|ILLINOIS|1|9.52|$211,250.00
AQ7340|09|MICHIGAN|2|19.13|$424,312.98
AQ7340|09|MINNESOTA|2|24.93|$552,916.34
AQ7340|09|MISSOURI|1|10.82|$239,984.67
AQ7340|09|WASHINGTON|2|26.62|$590,528.47
AQ7340|10|U.S. BANK N.A.|9|100.0|$2,218,111.30
AQ7340|17|BROKER|1|16.91|$375,000.00
AQ7340|17|CORRESPONDENT|6|59.27|$1,314,611.30
AQ7340|17|RETAIL|2|23.83|$528,500.00

In this tutorial, we will be processing NIPS files to add up the total unpaid balances (UPBs) totaled on a per-state basis. Referring to the NIPS file layout description, I see I need to look at records where field #2 is "09". What we will want to do with this data is to accumulate the dollar amount of each UPB into a "state" key, over an entire NIPS file or set of NIPS files, and output the totals by state when we're done.

These are not huge datasets, of course.  But, for the purpose of creating an interesting tutorial, we'll process a small amount of NIPS data and see which states are seeing the most mortgage activity (at least as far as Fannie-Mae new issues are concerned).  The main point here is learning how to process the data and leverage the capabilities of Pig.

Loading the data into HDFS

I will start by downloading all of the available data on the Fannie Mae NIPS page to my local filesystem.  At the time of this tutorial, this included data from the 23rd of August (2012) through the 21st of November.  This set provides just a little under 400K lines of output.  The next step is to copy from my local storage to HDFS:


$ bin/hadoop dfs -copyFromLocal /home/hduser/dev/pigExamples/nipsData /user/hduser/pigExample

We can verify the transfer to HDFS with:

$ bin/hadoop dfs -ls /user/hduser/pigExample

Examining a single file with Pig

We're going to start by loading a single file and attempting to filter out lines whose record type is not "09".  I'm assuming you have installed Pig and it is configured to access HDFS.  Start the Pig interpreter:

hduser@ubuntu:~$ pig
2012-11-24 16:56:19,305 [main] INFO  org.apache.pig.Main - Apache Pig version 0.10.0 (r1328203) compiled Apr 19 2012, 22:54:12
2012-11-24 16:56:19,306 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/hduser/pig_1353801379300.log
2012-11-24 16:56:19,518 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000
2012-11-24 16:56:19,858 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001
grunt> 

You can see from the output that Pig knows I'm running Hadoop in (pseudo-)distributed mode.  If you don't see these, verify your PIG_CLASSPATH is set.  Next I'm going to load a single file from HDFS into Pig.  Pig assumes the field delimiter is a tab; since our file is pipe ("|") delimited, we will use PigStorage to override the default:

grunt> NIPS_9Nov = load 'pigExample/nips_11092012.txt' using PigStorage('|');

(Note that the path is relative to '/user/hduser').  This load will not occur until the data is required; for example, right now a "dump" would cause the file to be loaded.  In fact, if you type

grunt> dump NIPS_9Nov;

you will see a flurry of activity, related to the Hadoop MapReduce task(s) being created on your behalf, culminating with the actual output of the parsed-on-pipe-symbol text, of which the last few lines look like the following:

(AQ7340,09,GEORGIA,1,8.98,$199,118.84)
(AQ7340,09,ILLINOIS,1,9.52,$211,250.00)
(AQ7340,09,MICHIGAN,2,19.13,$424,312.98)
(AQ7340,09,MINNESOTA,2,24.93,$552,916.34)
(AQ7340,09,MISSOURI,1,10.82,$239,984.67)
(AQ7340,09,WASHINGTON,2,26.62,$590,528.47)
(AQ7340,10,U.S. BANK N.A.,9,100.0,$2,218,111.30)
(AQ7340,17,BROKER,1,16.91,$375,000.00)
(AQ7340,17,CORRESPONDENT,6,59.27,$1,314,611.30)
(AQ7340,17,RETAIL,2,23.83,$528,500.00)

This is good; this is what we want.  Next we'll want to look only at the record-type=09 fields, then accumulate balances on a per-state level.  In a fresh Pig shell, enter the following:

grunt> nips_9nov = load '/user/hduser/pigExample/nips_11092012.txt' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
grunt> fr_9nov = filter nips_9nov by (recordType == 9);
grunt> dump fr_9nov;

This will produce the same output as before, only restricting it to the "record type = 09" fields.  Again, here is the tail of this output:

(AQ7337,9,WYOMING,1,2.7,$170,619.65)
(AQ7340,9,GEORGIA,1,8.98,$199,118.84)
(AQ7340,9,ILLINOIS,1,9.52,$211,250.00)
(AQ7340,9,MICHIGAN,2,19.13,$424,312.98)
(AQ7340,9,MINNESOTA,2,24.93,$552,916.34)
(AQ7340,9,MISSOURI,1,10.82,$239,984.67)
(AQ7340,9,WASHINGTON,2,26.62,$590,528.47)

At this point I'm going to change direction a little and put the Pig statements into a script, so it is a little easier to catch the output.  Create a new file called "pigTest.pig" and add the following lines to it:

nips_9nov = load '/user/hduser/pigExample/nips_11092012.txt' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr_9nov = filter nips_9nov by (recordType == 9);
dump fr_9nov;

Save the file and invoke it with:

pig -f pigTest.pig &> pigTest.log

Some of Pig's output goes to stderr, so you'll want to capture both stdout and stderr to your log file.  Open the log file and scroll down to:

 Job Stats (time in seconds):

and look at the next two lines, the first of which is a header.  Note that Pig only generated a Map job and no Reduce jobs (Maps = 1, Reduces = 0, Feature = "MAP_ONLY").  Since we are only loading records and filtering them based on a field characteristic, no Reduce job was necessary.

Next, we'll want to parse the aggregate unpaid balances for each mortgage, sum them by state, and output the totals.  The aggregate UPB is in the form of a human-readable, not-much-fun-to-parse bytearray (e.g. $3,759,464.16).  To treat these as floats we'll have to do a little cleanup.  This may not be terribly efficient, but I used a nested "REPLACE" function call:

fr_clean = foreach fr_9nov generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, '\$', ''), ',', '') as upbFloat;

Note that if you enter this expression in the Pig shell, you'll need two additional escape ("\") characters in front of the dollar sign (which, as in the java.lang.String.replaceAll() method, is interpreted as a regex). In a script, you'll need to escape both the dollar sign and the backslash.  Trust me.  fr_clean will now contain cleaned-up unpaid balances that look like real floats.  In the Pig shell, you can verify the schema of the relation (but not that the data will parse, as this has not happened yet) with the following:

grunt> describe fr_clean;
2012-11-26 23:21:45,570 [main] WARN  org.apache.pig.PigServer - Encountered Warning IMPLICIT_CAST_TO_CHARARRAY 1 time(s).
2012-11-26 23:21:45,570 [main] WARN  org.apache.pig.PigServer - Encountered Warning USING_OVERLOADED_FUNCTION 1 time(s).
fr_clean: {poolNumber: bytearray,state: bytearray,numberOfLoans: int,percentageUpb: float,upbFloat: float}

The final steps to output the states (and the District of Columbia) with the total unpaid balances of all new issues (for this file, in millions of dollars) are:

grunt> byState = group fr_clean by state;
grunt> totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0;
grunt> dump totalUpb;

I've glossed over these steps, but basically you are grouping by state and summing the unpaid balances on a per-state basis, scaling the totals by one million.  After the dump call is completed, we get 51 lines of output, the last few of which are here:

...
(CALIFORNIA,1021.2734624101563)
(NEW JERSEY,103.9833925234375)
(NEW MEXICO,18.8126310078125)
(WASHINGTON,153.9220293671875)
(CONNECTICUT,33.7019688515625)
(MISSISSIPPI,24.3124981796875)
(NORTH DAKOTA,7.280279875)
(PENNSYLVANIA,147.6614224453125)
(RHODE ISLAND,15.24327924609375)
(SOUTH DAKOTA,10.51592517578125)
(MASSACHUSETTS,156.1397877109375)
(NEW HAMPSHIRE,16.52243540234375)
(WEST VIRGINIA,8.3394678828125)
(NORTH CAROLINA,129.42278906640624)
(SOUTH CAROLINA,70.23617646875)
(DISTRICT OF COLUMBIA,18.288814109375)

In other words, California totaled slightly more than one billion dollars for the pools issued on the 9th of November in 2012.  

To wrap things up a little, I'll next run from a Pig script file.  I mentioned earlier we need to be a little careful about the escape character in the "REPLACE" call.  Here's the script to process a single file:

nips_9nov = load '/user/hduser/pigExample/nips_11092012.txt' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr_9nov = filter nips_9nov by (recordType == 9);
fr_clean = foreach fr_9nov generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, '\\\$', ''), ',', '') as upbFloat;
byState = group fr_clean by state;
totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0;
dump totalUpb;

Processing the entire dataset

There's not much left to do here but run against the entire dataset, which in our case is about three months' worth of new-issues files.  A slight modification to the script:

nips = load '/user/hduser/pigExample' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr = filter nips by (recordType == 9);
fr_clean = foreach fr generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, '\\\$', ''), ',', '') as upbFloat;
byState = group fr_clean by state;
totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0 as total;
sortedUpb = order totalUpb by total;
dump sortedUpb;

results in similar data (sorted in ascending order of total aggregate UPB), of course with only larger numbers.  For example, we see that during a three-month period starting in late August 2012, new Fannie-Mae pools representing $629M were issued for properties in Alaska.  You can also see from the output file that one Map and one Reduce job were created, and I have to admit, quite a number of records dropped (due to failure to parse):

2012-11-26 23:51:38,269 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 2396 time(s).
2012-11-26 23:51:38,269 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 30869 time(s).

On first inspection, it appears that 2396 "record type = 9" records actually didn't have enough fields to provide an aggregate unpaid balance column, and that I failed to successfully convert quite a few balances.  I did not investigate these records; however, such records generally tell you that you need to modify your parse logic.  In other words -- a good topic for another post!

No comments:

Post a Comment

Related Posts Plugin for WordPress, Blogger...