Showing posts with label MapReduce. Show all posts
Showing posts with label MapReduce. Show all posts

Monday 20 October 2014

Run the Chain MapReduce Job

Run the Chain MapReduce Job

Sometimes we need to run the depended multiple Map Reduce job(Map---->Reduce---->Map).
Here's the one example of ChainMapReduce Job.


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;

import com.orienit.hadoop.training.Dictionary;
import com.orienit.hadoop.training.SgmParser;

public class ChainWordCountDriver extends Configured implements Tool { 

// TokenizerMapper  -  Parse the input file record for every token
public static class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
    private final IntWritable one = new IntWritable(1);
    private Text word = new Text();
//here sgm parser is responsible for removing the stop words.
public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
        String line = value.toString();
 line = SgmParser.parse(line);
 line = line.replaceAll("\\s+", " ").trim();
 StringTokenizer tokenizer = new StringTokenizer(line);
 while (tokenizer.hasMoreTokens()) {
  output.collect(new Text(tokenizer.nextToken()), one);
        }
    }
}

//LowerCaserMapper - It will lowercase the passed token from TokenizerMapper
 public class LowerCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {

    public void map(Text key, IntWritable value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
        String word = key.toString().toLowerCase();
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word), value);
   }
}

//WordCountReducer - is doing nothing special just writing the key in the context
 public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
 sum += values.next().get(); }
 output.collect(key, new IntWritable(sum));
    }
}

//LastMapper - will spilt the record sent from reducer and write into the final output file
 public static class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, Text> {
// Now we have to to match filter words to the wordnet dictionary and find out the synsets.
    public void map(Text key, IntWritable value,OutputCollector<Text, Text> output,Reporter reporter) throws IOException {
     
 String word = key.toString();
StringBuffer sbr = new StringBuffer();
sbr.append(key.toString() + "\t" + value.toString());
//System.setProperty("wordnet.database.dir","/home/hadoop/WordnetDictionary/dict");
String matched = Dictionary.match(word);
output.collect(new Text(sbr.toString()), new Text(matched));
    }
}
 @Override
public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
    //conf.setJobName("wordcount");

    //Setting the input and output path
    FileInputFormat.setInputPaths(conf, new Path(args[0]));

    Path outputPath = new Path(args[1]);

    FileOutputFormat.setOutputPath(conf, outputPath);
    //Considering the input and output as text file set the input & output format to TextInputFormat
    conf.setInputFormat(XmlInputFormat.class);
    conf.set("xmlinput.start", "<TEXT>");
 conf.set("xmlinput.end", "</TEXT>");
    conf.setOutputFormat(TextOutputFormat.class);
    conf.set("mapred.textoutputformat.separator", "\n");
    JobConf mapAConf = new JobConf(false);

    ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);      

        //addMapper will take global conf object and mapper class ,input and output type for this mapper and output key/value have to be sent by value or by reference and localJObconf specific to this call

    JobConf mapBConf = new JobConf(false);
    ChainMapper.addMapper(conf, LowerCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);

    JobConf reduceConf = new JobConf(false);
    reduceConf.setCombinerClass(WordCountReducer.class);
    ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

   JobConf mapCConf = new JobConf(false);
   ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, Text.class, true, mapCConf);
    JobClient.runJob(conf);
    return 0;
}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
    System.exit(res);
}}
--------------SGMParser.java-------------------------

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.regex.Pattern;
import java.util.regex.Matcher;

public class SgmParser {
 public static String parse(String line) throws IOException
 {
   final String LINE_SEPARATOR = System.getProperty("line.separator");
   
   InputStream fstream =SgmParser.class.getResourceAsStream("stopwords.txt");
  
  //InputStream fstream = new FileInputStream("stopwords.txt");
   
   BufferedReader sfbr2 =null ;
   
   String token =null;
      // private static Pattern EXTRACTION_PATTERN = Pattern.compile("<TITLE>(.*?)</TITLE>|<DATE>(.*?)</DATE>|<BODY>(.*?)</BODY>");
       Pattern EXTRACTION_PATTERN = Pattern.compile("<BODY>(.*?)</BODY>");
   
        String[] META_CHARS = {"&", "<", ">", "\"", "'", "\""};
   
        String[] META_CHARS_SERIALIZATIONS  = {"&amp;", "&lt;", "&gt;", "&quot;", "&apos;"};
        
        
           int index = -1;
      //     int docNumber = 0;
           StringBuffer buffer = new StringBuffer();
           StringBuffer buffer1 = new StringBuffer();
           String parse = "";
        String lt ="<";
        String gt =">";
           
           for (int id = line.indexOf(lt); index >= 0; id = line.indexOf(lt,id + 1)) {
     int ct = line.indexOf(gt,index + 1);
     int ot = line.indexOf(lt,index + 1);
     if(ot!=-1)
      buffer1.append(line.substring(ct+1,ot)).append(" ");
    }
       if(buffer1.length()==0) {
        buffer1.append(line);
       }
      parse = buffer1.toString().toLowerCase();
      parse=parse.replaceAll("[^a-zA-Z]", " ");
      parse = parse.replaceAll("\\s+", " ").trim();       
                      
      if ((index = parse.indexOf("</REUTERS")) == -1) {
        //Replace the SGM escape sequences
        buffer.append(parse).append(' ');//accumulate the strings for now, then apply regular expression to get the pieces,
       } else {
        //Extract the relevant pieces and write to a file in the output dir
        Matcher matcher = EXTRACTION_PATTERN.matcher(parse);
        while (matcher.find()) {
           for (int i = 1; i <= matcher.groupCount(); i++) {
              if (matcher.group(i) != null) {
                  buffer.append(matcher.group(i));
              }
              buffer.append(LINE_SEPARATOR).append(LINE_SEPARATOR);
        }}}
        String out = buffer.toString();
        for (int i = 0; i < META_CHARS_SERIALIZATIONS.length; i++) {
          out = out.replaceAll(META_CHARS_SERIALIZATIONS[i], META_CHARS[i]);
        }
        
        sfbr2 = new BufferedReader(new InputStreamReader(fstream, "UTF-8"));
        while ((token = sfbr2.readLine()) != null) {
           out=out.replaceAll("\\b"+token.trim()+"\\b", "");
        }
       return out;
 }}

----------------------------Dictionary.java--------------------------------

import edu.smu.tspell.wordnet.Synset;
import edu.smu.tspell.wordnet.WordNetDatabase;

public class Dictionary {

 public static String match(String searchword)
 {
  // String wordForm = buffer.toString();
  // Get the synsets containing the wrod form
  WordNetDatabase database = WordNetDatabase.getFileInstance();
  Synset[] synsets = database.getSynsets(searchword);
  
  StringBuffer sbfr = new StringBuffer();
  // System.setProperty("wordnet.database.dir", "/home/hadoop/WordnetDictionary/dict");
  // Display the word forms and definitions for synsets retrieved
  if (synsets.length > 0)
  {
   for (int i = 0; i < synsets.length; i++)
   {
    
    String[] wordForms = synsets[i].getWordForms();
    for (int j = 0; j < wordForms.length; j++)
    {
     sbfr.append((j > 0 ? ", " : "") +
     wordForms[j]);
    }    }
   
    sbfr.append(": " + synsets[i].getDefinition() + "\n");
   }
  }
  else
  {
   sbfr.append("Not Found");
  }
  return sbfr.toString();
 }
}

XMLInputFormat for old mapred API

XMLInputFormat for old mapred API


Parse the XML files by using XMLInputFormat.
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;

/**
 * Reads records that are delimited by a specifc begin/end tag.
 */
public class XmlInputFormat extends TextInputFormat {

  public static final String START_TAG_KEY = "xmlinput.start";
  public static final String END_TAG_KEY = "xmlinput.end";

  @Override
  public RecordReader<LongWritable,Text> getRecordReader(InputSplit inputSplit,
                                                         JobConf jobConf,
                                                         Reporter reporter) throws IOException {
    return new XmlRecordReader((FileSplit) inputSplit, jobConf);
  }

  /**
   * XMLRecordReader class to read through a given xml document to output xml
   * blocks as records as specified by the start tag and end tag
   *
   */
  public static class XmlRecordReader implements
      RecordReader<LongWritable,Text> {
    private final byte[] startTag;
    private final byte[] endTag;
    private final long start;
    private final long end;
    private final FSDataInputStream fsin;
    private final DataOutputBuffer buffer = new DataOutputBuffer();

    public XmlRecordReader(FileSplit split, JobConf jobConf) throws IOException {
      startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8");
      endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8");

      // open the file and seek to the start of the split
      start = split.getStart();
      end = start + split.getLength();
      Path file = split.getPath();
      FileSystem fs = file.getFileSystem(jobConf);
      fsin = fs.open(split.getPath());
      fsin.seek(start);
    }

    @Override
    public boolean next(LongWritable key, Text value) throws IOException {
      if (fsin.getPos() < end) {
        if (readUntilMatch(startTag, false)) {
          try {
            buffer.write(startTag);
            if (readUntilMatch(endTag, true)) {
              key.set(fsin.getPos());
              value.set(buffer.getData(), 0, buffer.getLength());
              return true;
            }
          } finally {
            buffer.reset();
          }
        }
      }
      return false;
    }

    @Override
    public LongWritable createKey() {
      return new LongWritable();
    }

    @Override
    public Text createValue() {
      return new Text();
    }

    @Override
    public long getPos() throws IOException {
      return fsin.getPos();
    }

    @Override
    public void close() throws IOException {
      fsin.close();
    }

    @Override
    public float getProgress() throws IOException {
      return (fsin.getPos() - start) / (float) (end - start);
    }

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException    {
      int i = 0;
      while (true) {
        int b = fsin.read();
        // end of file:
        if (b == -1) return false;
        // save to buffer:
        if (withinBlock) buffer.write(b);

        // check if we're matching:
        if (b == match[i]) {
          i++;
          if (i >= match.length) return true;
        } else i = 0;
        // see if we've passed the stop point:
        if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
      }
    }
  }
}

MapReduce Code Testing

MR code testing using MRunit


MR framework provides the MRunit testing for test your MR code. You can test your code in
local environment and then can run it on cluster.


import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.mrunit.*;
import org.apache.hadoop.mrunit.types.Pair;
import com.google.common.collect.ImmutableList;

public class MRJobTest {

  private MapDriver<LongWritable, Text, Text, Text> mapDriver;
  private ReduceDriver<Text, Text, Text, Text> reduceDriver;
  private MapReduceDriver<LongWritable, Text, Text, Text, Text, Text> mapReduceDriver;

public class InvertedIndexMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
  public static final int RETAIlER_INDEX = 0;

  @Override
  public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
     final String[] record = StringUtils.split(text.toString(), ",");
     final String retailer = record[RETAIlER_INDEX];
     for (int i = 1; i < record.length; i++) {
        final String keyword = record[i];
        outputCollector.collect(new Text(keyword), new Text(retailer));
     }
   }
  }

 public class InvertedIndexReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

  @Override
  public void reduce(Text text, Iterator<Text> textIterator, OutputCollector<Text, Text> outputCollector, Reporter reporter)
   throws IOException {
  // TODO Auto-generated method stub
    final String retailers = StringUtils.join(textIterator, ',');
    outputCollector.collect(text, new Text(retailers));
 }
}

@Before
public void setUp() throws Exception {

  final InvertedIndexMapper mapper = new InvertedIndexMapper();
  final InvertedIndexReducer reducer = new InvertedIndexReducer();

  mapDriver = MapDriver.newMapDriver(mapper);
  reduceDriver = ReduceDriver.newReduceDriver(reducer);
  mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
}

@Test
public void testMapperWithSingleKeyAndValue() throws Exception {
  final LongWritable inputKey = new LongWritable(0);
  final Text inputValue = new Text("www.kroger.com,groceries,clothes");

  final Text outputKey = new Text("groceries");
  final Text outputValue = new Text("www.kroger.com");

  mapDriver.withInput(inputKey, inputValue);
  mapDriver.withOutput(outputKey, outputValue);
  mapDriver.runTest();

 }

@Test
public void testMapperWithSingleInputAndMultipleOutput() throws Exception {
  final LongWritable key = new LongWritable(0);
 mapDriver.withInput(key, new Text("www.amazon.com,books,music,toys,ebooks,movies,computers"));
  final List<Pair<Text, Text>> result = mapDriver.run();

  final Pair<Text, Text> books = new Pair<Text, Text>(new Text("books"), new Text("www.amazon.com"));
  final Pair<Text, Text> toys = new Pair<Text, Text>(new Text("toys"), new Text("www.amazon.com"));

 assertThat(result)
  .isNotNull()
  .hasSize(6)
  .contains(books, toys);
}

@Test
public void testReducer() throws Exception {
 final Text inputKey = new Text("books");
 final ImmutableList<Text> inputValue = ImmutableList.of(new Text("www.amazon.com"), new Text("www.ebay.com"));

 reduceDriver.withInput(inputKey,inputValue);
 final List<Pair<Text, Text>> result = reduceDriver.run();
 //final Pair<Text, Text> pair2 = new Pair<Text, Text>(inputKey, new Text("www.amazon.com,www.ebay.com"));
  reduceDriver.withOutput(inputKey, new Text("www.amazon.com,www.ebay.com"));
 /* assertThat(result)
  .isNotNull()
  .hasSize(1)
  .containsExactly(pair2); */
  reduceDriver.runTest();
  }

 }
Note: Add the mrunit jar and dependent jar.

Friday 17 October 2014

Speeding up a Pig+HBase MapReduce job by a factor of 15

The other day I ran a Pig script. Nothing fancy; I loaded some data into HBase and then ran a second Pig job to do some aggregations. I knew the data loading would take some time as it was multiple GB of data, but I expected the second aggregation job to run much faster. It ran for over 15 hours and was not done at that time. This was too long in my mind and I terminated it. I was using Amazon Elastic Map Reduce as my Hadoop environment, so I could have add more resources, but I wasn’t sure if that would do me any good. I took a look at the performance break downs, and within minutes I found the problem. Not only, that I realized that even an expert would have had a hard time identifying the problem. What’s more, the goto fix for more performance in Hadoop, which is adding more hardware, would not have helped at all!

Pig vs. Hive

Pig is one of the two predominant ways to access, transform and query data in Hadoop. Whereas Hive uses a SQL-like approach and it appeals to database people and analysts, Pig is a script language. Its appeal is in its procedural approach and its ability to transform data in a pipeline fashion, this appeals to ETL experts (Extract Transform Load). Pig is typically used to load data into Hadoop and perform complex transformations and aggregations that would be hard to do in a single Hive query. Both systems generate one or more MapReduce jobs and therein lies their power and complexity.

My MapReduce Job

For demonstration purposes I simply used a kmer index job to reproduce the issue, if you are interested in running it yourself get it from: https://github.com/lila/emr-hbase-sample
The job consists of two Amazon EMR steps in a single job flow. The first Pig script loads multiple gigabytes of genome sequence data into a single HBase table (I got some additional fasta sample genome files to do this compared to the original sample). The second pig job produces a size-based index. The respective Pig Latin looks like this:
A = LOAD 'hbase://kmer'
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
'sequence:*', '-loadKey true')
AS (id:bytearray, sequence:map[]);
B = foreach A generate id, SIZE(sequence);
C = group B by $1 PARALLEL $p;
D = foreach C generate group, COUNT(B);
I ran it on an empty HBase table and looked at the performance results.

The Loading Job

As said the first was a Pig job that loaded several gigabytes of data into a simple HBase table. When looking at it we can see a couple of things immediately:
The MapReduce Task is sending inserts to multiple HBase Region servers, thus not storing the data locally
The MapReduce Task is sending inserts to multiple HBase Region servers, thus not storing the data locally
First of all the HBase data is not stored locally. This is not much of a surprise. The Pig script is loading data from a file and gets distributed evenly across the Hadoop cluster based on that input data. That distribution is of course not the same as the resulting region distribution in HBase. Secondly we also notice that for the first 10 minutes only a single HBase region server is hit, then a second and only after 14 minutes do all 5 HBase region servers process data.
At the beginning of the job only a single HBase region Server consumes CPU while all others remain idle
At the beginning of the job only a single HBase region Server consumes CPU while all others remain idle
This is also well documented and has to do with HBase regions and splits (read this very good blogby Hortonworks if you want to know more).
And lastly we also see that this was a map-only job.
When looking at the break down in time we see that the kmerLoad jobs do not spend any time in reducing or shuffling but only in the mapping phase
When looking at the break down in time we see that the kmerLoad jobs do not spend any time in reducing or shuffling but only in the mapping phase
More specifically we spend nearly all time in HBase. As we are loading data, into HBase we would expect it to be the primary contributor.
This break down shows that all MapTasks are processing and spend nearly all their time sending data to hbase
This break down shows that all MapTasks are processing and spend nearly all their time sending data to HBase
Next I took a look at the HBase inserts. We can see how many items we processed and inserted into HBase:
This shows the inserts per region server and hbase table/column family. We also see the number of roundtrips compared to the number of rows inserted
This shows the inserts per region server and HBase table/column family. We also see the number of roundtrips compared to the number of rows inserted
A couple of things are interesting here. First of all we see that the number of inserts is not equally distributed across the region servers. That  is again due to the fact that the table was empty, and hbase started in a single region (see the previous screenshots and hortonworks blog).  We can also compare the number of inserted rows with the number of roundtrips, making sure that we leverage batching correctly.
When comparing this with the number of mapped records we see that we are doing roughly 27 inserts per input record and each output record corresponds to a single HBase row.
This shows Details about the MapReduce Job and several counters. In this case we see the mapped records.
This shows Details about the MapReduce Job and several counters. In this case we see the mapped records.
In summery we can say that the loading is already pretty optimal, the CPU load on the cluster is fine and we insert ~1700 rows into HBase per roundtrip.
This shows a single task attempt inserting rows into HBase, we can see that it inserts many rows in a single roundtrip
This shows a single task attempt inserting rows into HBase, we can see that it inserts many rows in a single roundtrip
Together with the fact that HBase really was the main contributor here means, that there was nothing much that I could do to speed things up, at least not with some serious expert help (suggestions on how to improve this beyond pre-spliting are welcome). So let’s move on the the index job.

The Index Job

The second job was creating a size-based index over all inserted data; hence it had to read all data in the table. This second step lasted for 15 hours and did not complete, because I terminated it at that time. By analyzing it I saw the root cause almost immediately.
The Database Overview shows a single Scan that makes a roundtrip to the HBase region server for every single row it retrieves
The Database Overview shows a single Scan that makes a roundtrip to the HBase region server for every single row it retrieves
Look at the roundtrips and the row counts processed. There is one more roundtrip than there are rows! That means the MapReduce job is calling out to the HBase region server for every single row, which is very inefficient.
The default scan caching on a Pig HBase job should be 100, but that was obviously not the case. The performance breakdown showed me that we were indeed spending all of our time waiting on HBase.
The HBase region Server contributes over 95% to our job execution time
The HBase region Server contributes over 95% to our job execution time
This flow also shows that we spend 95% of our time waiting on HBase
This flow also shows that we spend 95% of our time waiting on HBase
Because we are making one network roundtrip for every single row, the utilization of my cluster was miserable.
The CPU utilization in the hadoop cluster is bad, it only has a couple of spikes which seem to coincide with HBase region splits
The CPU utilization in the hadoop cluster is bad, it only has a couple of spikes which seem to coincide with HBase region splits
Therefore adding more hardware would not have helped much – it was not the lack of resources that made my indexing job slow.

Pig really doesn’t like HBase…

According to the documentation HBaseStorage has an option to set the Scanner Caching, which should lead to fewer roundtrips and more rows fetch in a single roundtrip. This is how it looks like in the PIG script:
A = LOAD 'hbase://kmer'
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
'sequence:*', '-loadKey true -caching 500')
AS (id:bytearray, sequence:map[]);
B = foreach A generate id, SIZE(sequence);
C = group B by $1 PARALLEL $p;
D = foreach C generate group, COUNT(B);
I ran a smaller sample to test this option. However when I looked at the result I saw that it was still reading one row at a time.
We see that the HBase scan is still doing a roundtrip for every single row it retrieves
We see that the HBase scan is still doing a roundtrip for every single row it retrieves
Now I was really curious, I added a couple of sensors to get more visibility and what I found was that HBaseStorage did pick up by caching configuration option as defined in the script, but the actual HBase Scanner was still being configured to use 1 as cache size!
The two selected rows show that while the Pig option (the first) retrieves the caching option defined in my pig script, the Scanner itself still gets Caching size 1 (second selected row)
The two selected rows show that while the Pig option (the first) retrieves the caching option defined in my pig script, the Scanner itself still gets Caching size 1 (second selected row)
I did not give up there though! Pig also has a way to set global MapReduce job options which is what I tried next. This can be done by adding the following to the Pig script
SET hbase.client.scanner.caching 1000;
It didn’t have any effect either. I could verify that the setting made it into job.xml submitted to Hadoop, by looking at the job tracker. I also verified that the Map Task itself was not aware of that setting at all.
This shows that the hbase.client.scanner.caching job option always returns 1, although the job.xml is set to 1000
This shows that the hbase.client.scanner.caching job option always returns 1, although the job.xml is set to 1000
Finally I launched a new Hadoop cluster via EMR and added the HBase option in the bootstrap script, making it a cluster wide configuration setting:
--bootstrap-action s3://us-east-1.elasticmapreduce/bootstrap-actions/configure-hbase --args -s,hbase.client.scanner.caching=500
I ran the job again, and wow was it fast. My sample subset went down from 9 minutes to 2! My overall map time went down from 7 minutes to 25 seconds!
This shows the same job executed twice, once with caching option not working and once with a cluster wide setting that did work. The red rectangle shows the dramatic effect this has on map time
This shows the same job executed twice, once with caching option not working and once with a cluster wide setting that did work. The red rectangle shows the dramatic effect this has on map time
Satisfied that the scanner caching option finally worked I ran my big job again. The improvement was massive. It went down from over 15 hours (where I terminated it!) to 75 minutes! That is over15 times faster on the same hardware!
Looking at the transaction flow we see that we still spend most of our time in HBase, but it went down from over 95% to 72%. The shift meant that a lot more processing was done on the MapReduce side itself. To put this into context this means that a lot more time was spent in the aggregation part of the pig script.
This transaction flow shows that we now spend only 72% of the job time waiting on HBase and much more time doing actual processing
This transaction flow shows that we now spend only 72% of the job time waiting on HBase and much more time doing actual processing
Thus we’ve turned this job from one mostly waiting on HBase and network to one that is actually leveraging the CPU. As a result the utilization of my cluster is much better now, albeit far from optimal.
The CPU load on the Cluster is much better now, compared to before.
The CPU load on the Cluster is much better now, compared to before.
There is still a lot of room for improvement, CPU utilization of my Hadoop nodes is not close to 100%, so we might be able to make this even faster, but that is work for another day. What’s important is, without the insight that our APM solution gives us I would have never know how to fix this!

Conclusion

While a Pig or Hadoop expert might have told me right away to set the caching option, even he wouldn’t have figured out easily why the option didn’t take effect. With APM I was able to verify the number of roundtrips and immediately saw that the option didn’t work; I didn’t need to spend days or hours to realize that. While I do not know why the option doesn’t work (any Hadoop developers here who can tell us?) I was able to remedy the situation and also verify that the site-wide option had the desired effect – with dramatic improvements!
When I ask customers how they deal with performance problems in MapReduce, they often tell me, beyond generic Hadoop tuning they don’t bother. They simply add more hardware, because they cannot have experts spending days or weeks browsing through logs or looking at every job they execute. Now in this case, adding more hardware would not have helped as the existing cluster was well underutilized. And by leveraging an APM solution I was able to figure out the root cause, that the documented fix to this was not working and come up with an alternate solution within an hour! I sped up my job by a factor of 15, without adding any more hardware to my cluster and without needing to have an expert go through log files!

Eating our own dog food – 2x faster Hadoop MapReduce Jobs

For a while now I have been writing about how to analyze and optimize Hadoop jobs beyond just tweaking MapReduce options. The other day I took a look at some of our Outage Analyzer Hadoop jobs and put words into action.
A simple analysis of the Outage Analyzer jobs with Compuware APM 5.5 identified three hotspots and two potential Hadoop problems in one of our biggest jobs. It took the responsible developer a couple of hours to fix it and the result is a 2x improvement overall and a 6x improvement on the Reduce part of the job. Let’s see how we achieved that.

About Outage Analyzer

Outage Analyzer is a free service provided by Compuware which displays in real-time any availability problems with the most popular 3rd party content providers on the Internet.  It is available at http://www.outageanalyzer.com. It uses real time analytical process technologies to do anomaly detection and event correlation and classification. It stores billions of measures taken from Compuware’s global testing  network every day in Hadoop and runs different MapReduce jobs to analyze the data. I examined the performance of these  MapReduce jobs.

Identifying worthwhile jobs to analyze

The first thing I did was look for a worthwhile job to analyze. To do this, I looked at cluster utilization broken down by user and job.
This chart visualizes the cluster CPU usage by user giving a good indication about which user executes the most expensive jobs
This chart visualizes the cluster CPU usage by user giving a good indication about which user executes the most expensive jobs
What I found was that John was the biggest user of our cluster. So I looked at the jobs John was running.
These are all the jobs that John was running over the last several days. Its always the same one, consuming about the same amount of resources
These are all the jobs that John was running over the last several days. Its always the same one, consuming about the same amount of resources
The largest job by far was an analytics simulation of a full day of measurement data.  This job is run often to test and tune changes to the analytics algorithms.  Except for one of the runs, all of them lasted about 6.5 hours in real time and consumed nearly 100% CPU of the cluster during that time. This looked like a worthy candidate for further analysis.

Identifying which Job Phase to focus on

My next step was to look at a breakdown of the job from two angles: consumption and real time. From a real-time perspective, map and reduce took about the same amount of time – roughly 3 hours each. This could also be nicely seen in the resource usage of the job.
This dashboard shows the overall cluster utilization during the time the job ran. 100% of the cluster CPU is used during most of the time
This dashboard shows the overall cluster utilization during the time the job ran. 100% of the cluster CPU is used during most of the time
The significant drop in Load Average and the smaller drop in the other charts mark the end of the mapping phase and the start of pure reducing. What is immediately obvious is that the reducing phase, while lasting about the same time, does not consume as many resources as the map phase. The Load Average is significantly lower and the CPU utilization drops in several steps before the job is finished.
On the one hand that is because we have a priority scheduler and reducing does not use all slots, but more importantly, reducing cannot be parallelized as much as mapping. Every optimization here counts two fold, because you cannot scale things away! While the mapping phase is clearly consuming more resources, the reducing phase is a bottleneck and might therefore benefit even more from optimization.
The breakdown of job phase times shows that the mapping phase consumes twice as much time as reducing, even though we know that the job real time of the two phases is about the same - 3 hours each
The breakdown of job phase times shows that the mapping phase consumes twice as much time as reducing, even though we know that the job real time of the two phases is about the same – 3 hours each
As we can see the Map Time (time we spend in the mapping function, excluding merging, spilling and combining) is twice as high as the reduce time. The reduce time here represents the time that tasks were actually spending in the reduce function, excluding shuffle and merge time (which is represented separately). As such those two times represent those portions of the job that are directly impacted by the Map and Reduce code, which is usually custom code – and therefore tuneable by our developers!

Analyzing Map and Reduce Performance

So as a next step I used Compuware APM to get a high-level performance breakdown of the job’s respective 3 hour mapping and reducing phases. A single click gave me this pretty clear picture of the mapping phase:
This is a hot spot analysis of our 3 hour mapping phase which ran across 10 servers in our hadoop cluster
This is a hot spot analysis of our 3 hour mapping phase which ran across 10 servers in our hadoop cluster
The hot spot dashboard for the mapping phase shows that we spent the majority of the time (about 70%) in our own code and that it’s about 50% CPU time. This indicates a lot of potential for improvement. Next, I looked at the reducing phase.
This hot spot shows that we spend nearly all of our reducing time in all reduce tasks in our own code!
This hot spot shows that we spend nearly all of our reducing time in all reduce tasks in our own code!
This shows that 99% of the reducing time is spent on our own code and not in any Hadoop framework.  Since the reduce phase was clearly the winner in terms of potential, I looked at the details of that hot spot – and immediately found three hot spots that were responsible for the majority of the reduce time.

Three simple code issues consume 70% of the reduce time

This is what the method hot spots dashboard for the reduce phase showed.
These are the method hot spots for the reduce phase, showing that nearly everything is down to only 3 line items
These are the method hot spots for the reduce phase, showing that nearly everything is down to only 3 line items
The top three items in the method hot spot told me everything I needed know. As it turned out nearly all other items listed were sub-hotspots of the top most method:
  1. SimpleDateFormat initialization:
    The 5 items marked in red are all due to the creation of a SimpleDateFormat object. As most of us find out very painfully during our early career as a Java developer, the SimpleDateFormat is not thread safe and cannot be used as a static variable easily. This is why the developer chose the easiest route and created a new one for every call, leading to about 1.5 billion creations of this object. The initialization of the Formatter is actually very expensive and involves resource lookups, locale lookups and time calculations (seen in the separate line items listed here). This item alone consumed about 40% of our reduce time.
    Solution: We chose to use the well-known Joda framework (the code replacement was easy) and made the Formatter a static final variable; totally removing this big hot spot from the equation.
  2. Regular Expression Matching (line two in the picture)
    In order to split the CSV input we were using java.lang.String.split. It is often forgotten that this method uses regular expressions underneath. RegEx is rather CPU intensive and overkill for such a simple job. This was consuming another 15-20% of the allotted CPU time.
    Solution: We changed this to a simple string tokenizer.
  3.  Exception throwing (line three in the picture)
    This example was especially interesting. During the reading of input data we are parsing numeric values, and if the field is not a correct number java.lang.Long.parseLong will throw aNumberFormatException. Our code would catch it, mark the field as invalid and ignore the exception. The fact is that nearly every input record in that feed has an invalid field or an empty field that should contain a number. Throwing this exception billions of times consumed another 10% of our CPU time.
    Solution: We changed the code in a way to avoid the exception altogether.
And there we have it – three simple hot spots were consuming about 70% of our reduce CPU time! During analysis of the mapping portion I found the same hot spots again, where they contributed about 20-30% to the CPU time.
I sent this analysis to the developer and we decided to eat our own dog food, fix it and rerun the job to analyze the changes.

Job done in half the time – 6-fold improvement in reduce time!

The result of the modified code exceeded our expectations by quite a bit! The immediate changes saw the job time reduced by 50%. Instead of lasting about 6.5 hours, it was done after 3.5! Even more impressive was that while the mapping time only went down by about 15%, the reducing time was slashed from 3 hours to 30 minutes!
This is the jobs cluster CPU Usage and Load Average after we made the changes
This is the jobs cluster CPU Usage and Load Average after we made the changes
The Cluster Utilization shows a very clear picture. The overall utilization and load average during mapping phase actually increased a bit and instead of lasting 3 hours 10 minutes it was done after2 hours and 40 minutes. While not huge this is still a 15% improvement.
The reduce phase on the other hand shrank dramatically: from roughly 3 hours to 30 minutes! That means a couple of hours of development work lead to an impressive 6-fold performance improvement! We also see that the reduce phase is of course still not utilizing the whole cluster and its actually the 100% phase that got a lot shorter.

Conclusion

Three simple code fixes resulted in a 100% improvement of our biggest job and a 6-fold speedup of the reduce portion. Suffice it to say that this totally surprised the owners of the job. The job was utilizing 100% of the cluster, which for them meant that from a Hadoop perspective things were running in an optimal fashion. While this is true, it doesn’t mean that the job itself is efficient!
This example shows that optimizing MapReduce jobs beyond tinkering with Hadoop options can lead to a lot more efficiency without adding any more hardware – achieving the same result with fewer resources!
The Hotspot analysis did also reveal some Hadoop specific hotspots that led us to change some job options and speed things up even more. More on that in my next blog.

Thursday 31 July 2014

Big Data Basics - Part 6 - Related Apache Projects in Hadoop Ecosystem

Big Data Basics - Part 6 - Related Apache Projects in Hadoop Ecosystem

Problem

I have read the previous tips in the Big Data Basics series including the storage (HDFS) and computation (MapReduce) aspects. After reading through those tips, I understand that HDFS and MapReduce are the core components of Hadoop. Now, I want to know about other components that are part of the Hadoop Ecosystem.

Solution

In this tip we will take a look at some of the other popular Apache Projects that are part of the Hadoop Ecosystem.

Hadoop Ecosystem

As we learned in the previous tips, HDFS and MapReduce are the two core components of the Hadoop Ecosystem and are at the heart of the Hadoop framework. Now it's time to take a look at some of the other Apache Projects which are built around the Hadoop Framework which are part of the Hadoop Ecosystem. The following diagram shows some of the most popular Apache Projects/Frameworks that are part of the Hadoop Ecosystem.
Apache Hadoop Ecosystem
Next let us get an overview of each of the projects represented in the above diagram.

Apache Pig

Apache Pig is a software framework which offers a run-time environment for execution of MapReduce jobs on a Hadoop Cluster via a high-level scripting language called Pig Latin. The following are a few highlights of this project:
  • Pig is an abstraction (high level programming language) on top of a Hadoop cluster.
  • Pig Latin queries/commands are compiled into one or more MapReduce jobs and then executed on a Hadoop cluster.
  • Just like a real pig can eat almost anything, Apache Pig can operate on almost any kind of data.
  • Hadoop offers a shell called Grunt Shell for executing Pig commands.
  • DUMP and STORE are two of the most common commands in Pig. DUMP displays the results to screen and STORE stores the results to HDFS.
  • Pig offers various built-in operators, functions and other constructs for performing many common operations.
Additional Information: Home Page | Wiki | Documentation/User Guide/Reference Manual | Mailing Lists

Apache Hive

Apache Hive Data Warehouse framework facilitates the querying and management of large datasets residing in a distributed store/file system like Hadoop Distributed File System (HDFS).  The following are a few highlights of this project:
  • Hive offers a technique to map a tabular structure on to data stored in distributed storage.
  • Hive supports most of the data types available in many popular relational database platforms.
  • Hive has various built-in functions, types, etc. for handling many commonly performed operations.
  • Hive allows querying of the data from distributed storage through the mapped tabular structure.
  • Hive offers various features, which are similar to relational databases, like partitioning, indexing, external tables, etc.
  • Hive manages its internal data (system catalog) like metadata about Hive Tables, Partitioning information, etc. in a separate database known as Hive Metastore.
  • Hive queries are written in a SQL-like language known as HiveQL.
  • Hive also allows plugging in custom mappers, custom reducers, custom user-defined functions, etc. to perform more sophisticated operations.
  • HiveQL queries are executed via MapReduce. Meaning, when a HiveQL query is issued, it triggers a Map and/or Reduce job(s) to perform the operation defined in the query.
Additional Information: Home Page | Wiki | Documentation/User Guide/Reference Manual | Mailing Lists

Apache Mahout

Apache Mahout is a scalable machine learning and data mining library. The following are a few highlights of this project:
  • Mahout implements the machine learning and data mining algorithms using MapReduce.
  • Mahout has 4 major categories of algorithms: Collaborative Filtering, Classification, Clustering, and Dimensionality Reduction.
  • Mahout library contains two types of algorithms: Ones that can run in local mode and the others which can run in a distributed fashion.
  • More information on Algorithms: Mahout Algorithms.
Additional Information: Home Page | Wiki | Documentation/User Guide/Reference Manual | Mailing Lists

Apache HBase

Apache HBase is a distributed, versioned, column-oriented, scalable and a big data store on top of Hadoop/HDFS. The following are a few highlights of this project:
  • HBase is based on Google's BigTable concept.
  • Runs on top of Hadoop and HDFS in a distributed fashion.
  • Supports Billions of Rows and Millions of Columns.
  • Runs on a cluster of commodity hardware and scales linearly.
  • Offers consistent reads and writes.
  • Offers easy to use Java APIs for client access.
Additional Information: Home Page | Wiki | Documentation/User Guide/Reference Manual | Mailing Lists

Apache Sqoop

Apache Sqoop is a tool designed for efficiently transferring the data between Hadoop and Relational Databases (RDBMS). The following are a few highlights of this project:
  • Sqoop can efficiently transfer bulk data between HDFS and Relational Databases.
  • Sqoop allows importing the data into HDFS in an incremental fashion.
  • Sqoop can import and export data to and from HDFS, Hive, Relational Databases and Data Warehouses.
  • Sqoop uses MapReduce to import and export of data thereby effectively utilizing the parallelism and fault tolerance features of Hadoop.
  • Sqoop offers a command line commonly referred to as Sqoop command line.
Additional Information: Home Page | Wiki | Documentation/User Guide/Reference Manual | Mailing Lists

Apache Oozie

Apache Oozie is a job workflow scheduling and coordination manager for managing the jobs executed on Hadoop. The following are a few highlights of this project:
  • Oozie can include both MapReduce as well as Non-MapReduce jobs.
  • Oozie is integrated with Hadoop and is an integral part of the Hadoop Ecosystem.
  • Oozie supports various jobs out of the box including MapReduce, Pig, Hive, Sqoop, etc.
  • Oozie jobs are scheduled/recurring jobs and are executed based on scheduled frequency and availability of data.
  • Oozie jobs are organized/arranged in a Directed Acyclic Graph (DAG) fashion.
Additional Information: Home Page | Wiki | Documentation/User Guide/Reference Manual | Mailing Lists

Apache ZooKeeper

Apache ZooKeeper is an open source coordination service for distributed applications. The following are a few highlights of this project:
  • ZooKeeper is designed to be a centralized service.
  • ZooKeeper is responsible for maintaining configuration information, offering coordination in a distributed fashion, and a host of other capabilities.
  • ZooKeeper offers necessary tools for writing distributed applications which can coordinate effectively.
  • ZooKeeper simplifies the development of distributed applications.
  • ZooKeeper is being used by some of the Apache projects like HBase to offer high availability and high degree of coordination in a distributed environment.
Additional Information: Home Page | Wiki | Documentation/User Guide/Reference Manual | Mailing Lists

Apache Ambari

Apache Ambari is an open source software framework for provisioning, managing, and monitoring Hadoop clusters. The following are few highlights of this project:
  • Ambari is useful for installing Hadoop services across different nodes of the cluster and handling the configuration of Hadoop Services on the cluster.
  • Ambari offers centralized management of the cluster including configuration and re-configuration of services, starting and stopping of cluster and a lot more.
  • Ambari offers a dashboard for monitoring the overall health of the cluster.
  • Ambari offers alerting and email mechanism to get the required attention when required.
  • Ambari offers REST APIs to developers for application integration.
Additional Information: Home Page | Wiki | Documentation/User Guide/Reference Manual | Mailing Lists

Conclusion

These are some of the popular Apache Projects. Apart from those, there are various other Apache Projects that are built around the Hadoop framework and have become part of the Hadoop Ecosystem. Some of these projects include
  • Apache Avro - An open source framework for Remote procedure calls (RPC) and data serialization and data exchange
  • Apache Spark - A fast and general engine for large-scale data processing
  • Apache Cassandra - A Distributed Non-SQL Big Data Database

References

Next Steps
  • Explore more about Big Data and Hadoop.
  • Explore more about various Apache Projects.

 

Related Posts Plugin for WordPress, Blogger...