Showing posts with label MapReduce. Show all posts
Showing posts with label MapReduce. Show all posts

Monday 4 January 2016

Running a sample Pi example

To run any application on top of YARN, you need to follow this Java command syntax:
$ yarn jar <application_jar.jar> <arg0> <arg1>

To run a sample example to calculate the value of PI with 16 maps and 10,000 samples, use the following command:
$ yarn jar $YARN_EXAMPLES/hadoop-mapreduce-examples-2.4.0.2.1.1.0-385.jar PI 16 10000

Note that we are using  hadoop-mapreduce-examples-2.4.0.2.1.1.0-385.jar  here.

The JAR version may change depending on your installed Hadoop distribution.

Once you hit the preceding command on the console, you will see the logs generated by the application on the console, as shown in the following command. The default logger configuration is displayed on the console. 

The default mode is INFO, and you may change it by overwriting the default logger settings by updating hadoop.root.logger=WARN,console in conf/log4j.properties:

Number of Maps  = 16
Samples per Map = 10000
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Wrote input for Map #10
Wrote input for Map #11
Wrote input for Map #12
Wrote input for Map #13
Wrote input for Map #14
Wrote input for Map #15

Starting Job
11/09/14 21:12:02 INFO mapreduce.Job: map 0% reduce 0% 
11/09/14 21:12:09 INFO mapreduce.Job: map 25% reduce 0% 
11/09/14 21:12:11 INFO mapreduce.Job: map 56% reduce 0% 
11/09/14 21:12:12 INFO mapreduce.Job: map 100% reduce 0% 
11/09/14 21:12:12 INFO mapreduce.Job: map 100% reduce 100% 
11/09/14 21:12:12 INFO mapreduce.Job: Job job_1381790835497_0003 completed successfully 
11/09/14 21:12:19 INFO mapreduce.Job: Counters: 44        

File System Counters                
    FILE: Number of bytes read=358                
    FILE: Number of bytes written=1365080
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=4214                
                HDFS: Number of bytes written=215
                HDFS: Number of read operations=67
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=3
        Job Counters
                Launched map tasks=16
                Launched reduce tasks=1
                Data-local map tasks=14
                Rack-local map tasks=2
                Total time spent by all maps in occupied slots  (ms)=184421
                Total time spent by all reduces in occupied slots (ms)=8542
        Map-Reduce Framework
                Map input records=16
                Map output records=32
                Map output bytes=288
                Map output materialized bytes=448
                Input split bytes=2326
                Combine input records=0
                Combine output records=0
                Reduce input groups=2
                Reduce shuffle bytes=448
                Reduce input records=32
                Reduce output records=0
                Spilled Records=64
                Shuffled Maps =16
                Failed Shuffles=0
                Merged Map outputs=16
                GC time elapsed (ms)=195 
                CPU time spent (ms)=7740
                Physical memory (bytes) snapshot=6143396896
                Virtual memory (bytes) snapshot=23142254400
                Total committed heap usage (bytes)=43340769024
  Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0 
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=1848
        File Output Format Counters
                Bytes Written=98
Job Finished in 23.144 seconds 

Estimated value of Pi is 3.14127500000000000000

You can compare the example that runs over Hadoop 1.x and the one that runs over YARN. You can hardly differentiate by looking at the logs, but you can clearly identify the difference in performance. YARN has backward-compatibility support with MapReduce 1.x, without any code change.






Running sample examples on YARN

Running the available sample MapReduce programs is a simple task with YARN. The Hadoop version ships with some basic MapReduce examples. 

You can find them inside $HADOOP_HOME/share/Hadoop/mapreduce/Hadoop-mapreduce-examples-<HADOOP_VERSION>.jar . 

The location of the file may differ depending on your Hadoop
installation folder structure.

Let’s include this in the  YARN_EXAMPLES  path:
$export YARN_EXAMPLES=$HADOOP_HOME/share/Hadoop/mapreduce

Now, we have all the sample examples in the YARN_EXAMPLES environmental variable. You can access all the examples using this variable; to list all the available examples, try typing the following command on the console:

$ yarn jar $YARN_EXAMPLES/hadoop-mapreduce-examples-2.4.0.2.1.1.0-385.jar

An example program must be given as the first argument.

The valid program names are as follows:

  • aggregatewordcount : This is an aggregate-based map/reduce program that counts the words in the input files
  • aggregatewordhist : This is an aggregate-based map/reduce program that computes the histogram of the words in the input files
  • bbp : This is a map/reduce program that uses Bailey-Borwein-Plouffe to compute the exact digits of Pi
  • dbcount : This is an example job that counts the page view counts from a database
  • distbbp : This is a map/reduce program that uses a BBP-type formula to compute the exact bits of Pi
  • grep : This is a map/reduce program that counts the matches of a regex in the input
  • join : This is a job that affects a join over sorted, equally-partitioned datasets
  • multifilewc : This is a job that counts words from several files
  • pentomino : This is a map/reduce tile that lays a program to find solutions to pentomino problems
  • pi : This is a map/reduce program that estimates Pi using a quasi-Monte Carlo method
  • randomtextwriter : This is a map/reduce program that writes 10 GB of random textual data per node
  • randomwriter : This is a map/reduce program that writes 10 GB of random data per node
  • secondarysort : This is an example that defines a secondary sort to the reduce
  • sort : This is a map/reduce program that sorts the data written by the random writer
  • sudoku : This is a sudoku solver
  • teragen : This generates data for the terasort
  • terasort : This runs the terasort
  • teravalidate : This checks the results of terasort
  • wordcount : This is a map/reduce program that counts the words in the input files
  • wordmean : This is a map/reduce program that counts the average length of the words in the input files
  • wordmedian : This is a map/reduce program that counts the median length of the words in the input files
  • wordstandarddeviation : This is a map/reduce program that counts the standard deviation of the length of the words in the input files


These were the sample examples that come as part of the YARN distribution by default. 


Source compatibility of org.apache.hadoop.mapred APIs

Source incompatibility means that some code changes are required for compilation.

Source incompatibility is orthogonal to binary compatibility.

Binaries for an application that is binary compatible but not source compatible will continue to run fine on the new framework. However, code changes are required to regenerate these binaries.

Apache Hadoop 2.x does not ensure complete binary compatibility with the applications that use  org.apache.hadoop.mapreduce  APIs, as these APIs have evolved a lot since MRv1. However, it ensures source compatibility for  org.apache.hadoop.mapreduce  APIs that break binary compatibility. In other words, you should recompile the applications that use MapReduce APIs against MRv2 JARs.

Existing applications that use MapReduce APIs are source compatible and can run on YARN with no changes, recompilation, and/or minor updates.

If an MRv1 MapReduce-based application fails to run on YARN, you are requested to investigate its source code and check whether MapReduce APIs are referred to or not. If they are referred to, you have to recompile the application against the MRv2 JARs that are shipped with Hadoop 2.

Old and new MapReduce APIs

The new API (which is also known as Context Objects) was primarily designed to make the API easier to evolve in the future and is type incompatible with the old one.

The new API came into the picture from the 1.x release series. However, it was partially supported in this series. So, the old API is recommended for 1.x series:


Feature\Release 1.x 0.23
Old MapReduce API Yes Deprecated
New MapReduce API Partial Yes
MRv1 runtime (Classic) Yes No
MRv2 runtime (YARN) No Yes


The old and new API can be compared as follows:


Old API New API
The old API is in the org.apache.hadoop.mapred
package and is still present.
The new API is in the org.apache.hadoop.mapreduce
Package.
The old API used interfaces for Mapper and Reducer. The new API uses Abstract Classes for Mapper and
Reducer.
The old API used the JobConf, OutputCollector, and Reporter object to communicate with the MapReduce System. The new API uses the context object to communicate with the MapReduce system.
In the old API, job control was done through the JobClient. In the new API, job control is performed through the Job Class.
In the old API, job configuration was done with a JobConf Object In the new APO, job configuration is done through the Configuration class via some of the helper methods on Job.
In the old API, both the map and reduce outputs are named part-nnnnn . In the new API, the map outputs are named part-m-nnnnn and the reduce outputs are named part-r-nnnnn .
In the old API, the reduce() method passes values as a java.lang.Iterator . In the new API, the . method passes values as a
java.lang.Iterable .
The old API controls mappers by writing a MapRunnable, but no equivalent exists for reducers. The new API allows both mappers and reducers to control the execution flow by overriding the run() method.





Tuesday 21 October 2014

Hadoop MapReduce -- A Java vs Pig Example

In a recent post, I used Pig to analyze some MBS (mortgage-backed security) new-issue pool data from Fannie Mae.  At the time, I noted a number of errors processing the files in my data set, and now I'm going to go back and debug the job.  My goal is to clean up my parsing/processing logic sufficiently to get most, if not all, of the records included in my data.  Whether this is important or not depends on the task, of course.  If you are trying to reconcile records between two systems (e.g. to find lost revenue in a telecom billing system), you'd prefer not to lose any records.  If you are examining polling data to predict an election, you can probably afford to lose a few records.

I'm going to cheat a little, since I feel it's a little easier to debug in a Java MapReduce application than in Pig.  So I'm going to go back to Java, examine the data in a little more detail there, and then see what I could have done in Pig, had that been my only option.  I'll use the "new" MapReduce API available since Hadoop 0.20, referred to as "Context Objects".

To refresh your memory or to save you the trouble of reading my previous post, I'm looking through the files that Fannie Mae makes available after each business day, showing the new issues of mortgage pools from the day before.  I had processed the files to determine the total amount of unpaid balance (UPB) on a per-state basis.  Another important point I will repeat here is that these files contain records of several different types (and widths); you determine the record type by looking at column #2 and then referring to a file that contains the description for that record type.

Since my last post on this topic, a few more NIPS (New Issue Pool Statistics) files have been made available.  I import all of them into HDFS with

hadoop dfs -copyFromLocal /home/hduser/fannie-mae-nips /user/hduser

This set gives me data from 23 August to 26 December 2012.

 The Java MapReduce Job

I write a mapper and reducer, the main details of which follow.  First, my (Maven) project:

 mvn archetype:generate -DgroupId=com.example.fannieMaeNips
-DartifactId=fannie-mae-nips -DarchetypeArtifactId=maven-archetype-quickstart
-DinteractiveMode=false


Next, edit the pom.xml file and add the Hadoop dependency:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.1.0</version>
</dependency>


I drill down into the Java source directory, delete the Maven-supplied sample class, and create NipsMapReduce.java.  The new MapReduce API is found in the org.apache.hadoop.mapreduce package, rather than the oldorg.apache.hadoop.mapred.  I mention this again because the similarities between the classes in the two packages can lead to some odd errors at compile (or run) time if you use one package and "write to the other."  For more details, see online resources for Hadoop or Tom White's Hadoop:  The Definitive Guide.  

Here is the full source code for the mapper and reducer:

package com.example.fannieMaeNips;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NipsMapReduce
{

  static class NipsMapReduceMapper
    extends Mapper<LongWritable, Text, Text, DoubleWritable> {
    private Text state = new Text();
    public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
      // the default InputFormat for Mapper is TextInputFormat,
      // which splits a file into lines.
      // here, the key is the offset into the file;
      // value is the value of the current line of text:
      String nextLine = value.toString();
      // note the argument to java.lang.String.split() is a regex:
      String fields[] = nextLine.split("\\|");
      // are we looking at record type = 9 (Geographic Distribution)?
      if (fields != null && fields.length == 6 && Integer.parseInt(fields[1]) == 9) {
        // set the key to the name of the state:
        state.set(fields[2]);
        // parse the Aggregate Unpaid Balance field
        // "$ddd,ddd,ddd.dd" format into a double:
        double aggUPB =
          Double.parseDouble(fields[5].substring(1).replace(",", ""));
System.out.println("Adding upb " + aggUPB + " (" + fields[5] + ") to state " + fields[2]);
        // write the state and the aggregate unpaid balance
        // out to the Context:
        context.write(state, new DoubleWritable(aggUPB));
      }
    }
  }
  static class NipsMapReduceReducer
    extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
    public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
      throws IOException, InterruptedException {
      double sum = 0.0;
      for (DoubleWritable value: values) {
System.out.println("Summing " + value.get() + " into state '" + key + "'");
        sum += value.get();
      }
      context.write(key, new DoubleWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception
  {
    if (args.length != 2) {
      System.err.println("Usage: NipsMapReduce <input path> <output path>");
      System.exit(-1);
    }
    Job job = new Job();
    job.setJarByClass(NipsMapReduce.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setMapperClass(NipsMapReduceMapper.class);
    job.setReducerClass(NipsMapReduceReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);
    System.exit(job.waitForCompletion(true)? 0:1);
  }
}
 


I've added a couple of comments to help, plus I put in some output to stdout to check in the log files after the job runs.  After I build the class, I export the path to the jarfile with HADOOP_CLASSPATH:

export HADOOP_CLASSPATH=/home/hduser/dev/fannie-mae-nips/target/fannie-mae-nips-1.0-SNAPSHOT.jar

At which point, we can run:

bin/hadoop com.example.fannieMaeNips.NipsMapReduce /user/hduser/fannie-mae-nips /user/hduser/fannie-mae-nips-output

Hadoop is running in pseudo-distributed node, so the input and output paths are in HDFS.

There's a fair amount of output to stdout, generated by Hadoop, during the run.  My output to stdout is captured in the Hadoop logs.  Somewhere near the end of the output, we see the following:

12/12/27 16:17:16 INFO mapred.JobClient:     Reduce input groups=54
This number includes the 50 states, plus the District of Columbia, Puerto Rico, Guam and the U.S. Virgin Islands.

If I look at the Hadoop log files, I will see my output to stdout in the "stdout" files for each "attempt".  To see the "stdout" and "stderr" files, I switched to the userlogs output directory for this job:

cd $HADOOP_PREFIX/logs/userlogs/job_201212271420_0001
 

Here's some example output of my mapper, followed by sample output from my reducer:

...
Adding upb 339366.0 ($339,366.00) to state ILLINOIS
Adding upb 679920.0 ($679,920.00) to state KENTUCKY
Adding upb 440944.88 ($440,944.88) to state NORTH CAROLINA
Adding upb 624000.0 ($624,000.00) to state OHIO
Adding upb 194238.0 ($194,238.00) to state SOUTH CAROLINA
...

...
Summing 988363.58 into state 'MASSACHUSETTS'
Summing 372349.79 into state 'MASSACHUSETTS'
Summing 2015439.31 into state 'MASSACHUSETTS'
Summing 1237239.1 into state 'MASSACHUSETTS'
Summing 250000.0 into state 'MASSACHUSETTS'
...

 

Note how the mapper is processing states as they appear in the NIPS files, while this particular reducer has been given all the values for a particular key ("MASSACHUSETTS") as a group.  I also notice that all of my "stderr" files are empty, leading me to believe that I had no parse errors.  The directories of each attempt are links, so to recursively list them and grep for the stderr entries, I issued the followed command:

ls -lLR | grep stderr | less

which gives me a stream of output like the following:

...
-rw-r--r-- 1 hduser hadoop     0 Dec 27 16:16 stderr
-rw-r--r-- 1 hduser hadoop     0 Dec 27 16:16 stderr
-rw-r--r-- 1 hduser hadoop    0 Dec 27 16:17 stderr
-rw-r--r-- 1 hduser hadoop    0 Dec 27 16:12 stderr
-rw-r--r-- 1 hduser hadoop       0 Dec 27 16:12 stderr

... 

So it appears the Java MapReduce application parsed all the files with no errors.  This result makes me curious to try my earlier post's Pig queries against the same data set. 


Revisiting the Pig Query Example

From my earlier post, I retrieve and modify my Pig script to point to the current data set:


nips = load '/user/hduser/fannie-mae-nips' using PigStorage('|') as (poolNumber:bytearray, recordType:int, state:bytearray, numberOfLoans:int, percentageUpb:float, aggregateUpb:bytearray);
fr = filter nips by (recordType == 9);
fr_clean = foreach fr generate poolNumber, state, numberOfLoans, percentageUpb, (float)REPLACE(REPLACE(aggregateUpb, '\\\$', ''), ',', '') as upbFloat;
byState = group fr_clean by state;
totalUpb = foreach byState generate group, SUM(fr_clean.upbFloat)/1000000.0 as total;
sortedUpb = order totalUpb by total;
dump sortedUpb;


Then, I run it with:

pig -f nipsDump.pig &> nipsDumpPig.log

First, let's compare a few values.  For California, my Java MapReduce program returned the following total:


bin/hadoop dfs -cat /user/hduser/fannie-mae-nips-output/part-r-00000 | grep CALIFORNIA
CALIFORNIA 8.911464282207979E10

while the Pig output, scaled by 10^6 (dollars), is


(CALIFORNIA,89114.64280685547)

In other words, identical.  Going back to the Pig log file, I see

2012-12-28 11:36:18,471 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning ACCESSING_NON_EXISTENT_FIELD 3348 time(s).
2012-12-28 11:36:18,476 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning FIELD_DISCARDED_TYPE_CONVERSION_FAILED 41331 time(s).

and yet, after checking all 54 outputs of both jobs, they are identical.  And there were no errors reported in the Java MapReduce program.  A false alarm, in other words, due to my inexperience with Pig.

A quick Internet search shows that I have likely run into an issue related to the unique nature of the records in the NIPS files, namely, that they contain records of several types (and widths).  My issue probably occurs since my Pig parse command attempts to grab the first 6 elements of every row, and not every row has 6 elements.  This issue doesn't occur in the Java MapReduce program because I check the length of the array into which my line has been split before processing it.  This failure only generates a warning, of course, and so does not affect the results of the Pig query.

To test to see if this really is the case, I add an additional counter to the Java MapReduce to count the number of times that a row has less than 6 elements.  Since my mapper is outputting DoubleWritables, I increment the key's value by 1.0 on each hit to keep things simple.  It should be easy to find the "non existent field" warning.  Looking at Fannie Mae's NIPS file format, it appears there are 3 records types with fewer than 6 columns, and hopefully for this file set they will add up to 3348.

Finding the type-conversion warnings might be a little more challenging, and time-consuming (especially since this is mostly for my own amusement) -- I would need to check every non-interesting line to see if the fields don't match my expected types.  For example, for record type '01', column 5 is a character string, which would cause a failure, since I try to parse it as a float.  I think I will just check for the "fewer than 6 columns" issue as a sanity check.



Adding the following line to the mapper of my Java MapReduce program, after the "if" statement evaluating the number of columns:


      else if (fields != null && fields.length < 6) {
        state.set("ShortRecords");
        context.write(state, new DoubleWritable(1.0));
      }


and re-running with:

export HADOOP_CLASSPATH=/home/hduser/dev/fannie-mae-nips/target/fannie-mae-nips-1.0-SNAPSHOT.jar
bin/hadoop com.example.fannieMaeNips.NipsMapReduce /user/hduser/fannie-mae-nips /user/hduser/fannie-mae-nips-output2

I get the following output:


SOUTH DAKOTA    1.0024923021099998E9
ShortRecords    3348.0
TENNESSEE       4.426305956350002E9

right in order, as expected, between South Dakota and Tennessee.

When I started this post, it was intended to be about debugging MapReduce jobs.  That would be a nice post, although not very original.  It ended up being a post comparing Java MapReduce with Pig MapReduce; hopefully it was interesting.  If nothing else it shows how compact a Pig query can be, even though the Java code wasn't very verbose either, for this admittedly simple case.

Snappy compression with Pig and native MapReduce

Assuming you have installed Hadoop on your cluster, if not please followhttp://code.google.com/p/hadoop-snappy/
This is the machine config of my cluster nodes, though the steps that follow could be followed with your installation/machine configs
pkommireddi@pkommireddi-wsl:/tools/hadoop/pig-0.9.1/lib$ uname -a
Linux pkommireddi-wsl 2.6.32-37-generic #81-Ubuntu SMP Fri Dec 2 20:32:42 UTC 2011 x86_64 GNU/Linux
Pig requires that the snappy jar and native be available on its classpath when a script is run.
The pig client here is installed at /tools/hadoop and the jar needs to be placed within $PIG_HOME/lib.
/tools/hadoop/pig-0.9.1/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar
Also, you need to point PIG to the snappy native
export PIG_OPTS="$PIG_OPTS -Djava.library.path=$HADOOP_HOME/lib/native/Linux-amd64-64"
Now you have 2 ways to use map output compression in the Pig scripts:
  1. Follow instructions on http://code.google.com/p/hadoop-snappy/ to set map output compression at a cluster level
  2. Use Pig’s “set” keyword for per job level configuration
    set mapred.compress.map.output true;
    set mapred.map.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec;
This should get you going with using Snappy for Map output compression with Pig. You can read and write Snappy compressed files as well, though I would not recommend doing that as its not very efficient space-wise compared to other compression algorithms. There is work being done to be able to use Snappy for creating intermediate/temporary files between multiple MR jobs. You can watch the work item herehttps://issues.apache.org/jira/browse/PIG-2319
Using Snappy for Native Java MapReduce:
Set Configuration parameters for Map output compression
Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.set("mapred.map.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec
Set Configuration parameters for Snappy compressed intermediate Sequence Files
conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); //Block level is better than Record level, in most cases
SequenceFileOutputFormat.setCompressOutput(conf, true);
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec
Benefits:
  1. Map tasks begin transferring data sooner compared to Gzip or Bzip (though more data needs to be transferred to Reduce tasks)
  2. Reduce tasks run faster with better decompression speeds
  3. Snappy is not CPU intensive – which means MR tasks have more CPU for user operations
What you SHOULD use Snappy for
Map output: Snappy works great if you have large amounts of data flowing from Mappers to the Reducers (you might not see a significant difference if data volume between Map and Reduce is low)
Temporary Intermediate files (not available currently as of Pig 0.9.2, applicable only to native Map Reduce) : If you have a series of MR jobs chained together, Snappy compression is a good way to store the intermediate files. Please do make sure these intermediate files are cleaned up soon enough so we don’t have disk space issues on the cluster.
What you should NOT use Snappy for
Permanent Storage: Snappy compression is not efficient space-wise and it is expensive to store data on HDFS (3-way replication)
Plain text files: Like Gzip, Snappy is not splittable. Do not store plain text files in Snappy compressed form, instead use a container like SequenceFile.
Related Posts Plugin for WordPress, Blogger...