Saturday 5 July 2014

Hadoop Performance Tuning Best Practices

Tuning Hadoop run-time parameters

Hadoop provides a set of options on cpu, memory, disk, and network for performance tuning. Most hadoop tasks are not cpu bounded, what we usually look into is to optimize usage of memory and disk spills.

Memory tuning

The general rule for memory tuning is: use as much memory as you can, but don’t trigger swapping. The parameter you can set for task memory is mapred.child.java.opts. You can put it in your configuration file.


1
2
3
4
<property>
    <name>mapred.child.java.opts</name>
    <value>-Xms1024M -Xmx2048M</value>
</property>
You can tune the best parameters for memory by monitoring memory usage on server using Ganglia, Cloudera manager, or Nagios. Cloudera has a slide focused on memory usage tuning, the link is here

Minimize the map disk spill

Disk IO is usually the performance bottleneck. There are a lot of parameters you can tune for minimizing spilling. What I use the most are:
  • compress mapper output
  • Use 70% of heap memory for spill buffer in mapper
In your configuration file, you can write:


1
2
3
4
5
6
7
8
9
10
11
12
<property>
    <name>mapred.compress.map.output</name>
    <value>true</value>
</property>
<property>
    <name>mapred.map.output.compression.codec</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
<property>
    <name>io.sort.mb</name>
    <value>800</value>
</property>
Although you can further tune reducer buffer, mapper sort record percent, and various of stuff, I found the best thing to do is reduce the mapper output size. Most of the time, the performance is fast enough after I refactor the mapper to output as little data as possible. For more information, check the same cloudera’s performance tuning guide.

Tuning mapper tasks

Unlike reducer tasks which you can specify the number of reducer, the number of mapper tasks is set implicitly. The tuning goal for the mapper is control the amount of mapper and the size of each job. When dealing with large files, hadoop split the file in to smaller chunk so that mapper can run it in parallel. However, the initializing new mapper job usually takes few seconds, this is also a overhead that we want to minimize. These are the things you can do:
  • Reuse jvm task
  • If the average mapper running time is shorter than one minute, you can increase the mapred.min.split.size, so that less mappers are allocated in slot and thus reduces the mapper initializing overhead.
  • Use Combine file input format for bunch of smaller files. I had an implementation that also use mapred.min.split.size to implicitly control the mapper size.
The configuration file would look like this:


1
2
3
4
5
6
7
8
9
10
11
12
<property>
    <name>mapred.job.reuse.jvm.num.tasks</name>
    <value>-1</value>
</property>
<property>
    <name>mapred.max.split.size</name>
    <value>268435456</value>
</property>
<property>
    <name>mapred.min.split.size</name>
    <value>134217728</value>
</property>

Use configuration file and command line arguments to set parameters

When I first started on hadoop, I setup those parameters in java program, but it is so hard-coded and inflexible. Thankfully, hadoop provides Tool interface and ToolRunner class to parse those parameters for you. Here’s a sample program:


1
2
3
4
5
6
7
8
9
10
11
12
public class ExampleJob extends Configured implements Tool{

  public static void main (String[] args) throws Exception{
    System.exit(ToolRunner.run(new ExampleJob(), args));
  }

  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    Job job = new Job(conf);
    // configure the rest of the job
  }
}
If your main class implements the interface, your program can take the config file as input:


1
hadoop jar ExampleJob-0.0.1.jar ExampleJob -conf my-conf.xml arg0 arg1
You can even pass extra parameters through command line like this:


1
hadoop jar ExampleJob-0.0.1.jar ExampleJob -Dmapred.reduce.tasks=20 arg0 arg1
Setting configuration as run-time arguments make you easier to test different parameters without recompile the program.

Tuning application-specific performance

Beyond general hadoop parameter setup, you can optimize your map-reduce program using some small tricks. Here are the tricks that I used the most.

Minimize your mapper output

Recall that mapper spill size is a serious performance bottleneck. The size of mapper output is sensitive to disk IO, network IO, and memory sensitive on shuffle phase. Minimizing the mapper output can improve the general performance a lot.
To do this, you can try the following
  1. Filter out records on mapper side, not on reducer side.
  2. Use minimal data to form your map output key and map output value.
  3. Extends BinaryComparable interface or use Text for your map output key
  4. Set mapper output to be compressed
Above all the optimization tips, I found this make the biggest change to many of my tasks, unless I can’t find a smaller key to reduce the mapper output.

Balancing reducer’s loading

Another common performance issue that you might encounter is unbalanced reducer tasks: one or several reducer takes most of the output from mapper and ran extremely long compare to other reducers.
To solve this, you can either
  1. Implement a better hash function in Partitioner class.
  2. If you know what keys are causing the issue, you can write a preprocess job to separate keys using MultipleOutputs. Then use another map-reduce job to process the special keys that cause the problem.

Conclusion

It’s fun to write raw map-reduce jobs because it gives you more precise control over performance tuning. If you already experienced hive or pig, I encourage you to try how to optimize the same job using raw map-reduce. You can find a lot of performance gain and more space to tune the performance. For more curious, you can also check the Yahoo’s tuning hadoop performance guides.

Process Small Files on Hadoop Using CombineFileInputFormat

From Cloudera’s blog:
A small file is one which is significantly smaller than the HDFS block size (default 64MB). If you’re storing small files, then you probably have lots of them (otherwise you wouldn’t turn to Hadoop), and the problem is that HDFS can’t handle lots of files.
In my benchmark, just using a custom CombineFileInputFormat can speedup the program from 3 hours to 23 minutes, and after some further tuning, the same task can be run in 6 minutes!

Benchmark Setup

To test the raw performance of different approaches to solve small problems, I setup a map only hadoop job that basically just do grep and perform a small binary search. The binary search part is to generate the reduce side keys that I’ll use in further data processing; it took only a little resource (8MB index) to run, so it does not affect the result of the benchmark.
The data to process is some server log data, 53.1 GB in total. The hadoop clusters consist 6 nodes, using hadoop version 1.1.2. In this benchmark I implemented CombineFileInputFormat to shrink the map jobs; I also tested the difference of reusing JVM or not, and different number of block sizes to combine files.

CombineFileInputFormat

The code listed here is modified from Hadoop example code. To use CombineFileInputFormat you need to implement three classes. The class CombineFileInputFormat is an abstract class with no implementation, so you must create a subclass to support it; we’ll name the subclass CFInputFormat. The subclass will initiate a delegate CFRecordReader that extends RecordReader; this is the code that does the file processing logic. We’ll also need a class for FileLineWritable, which replaces LongWritable normally used as a key to file lines.

CFInputFormat.java

The CFInputFormat.java doesn’t do much. You implement createRecordReader to pass in the record reader that does the combine file logic, that’s all. Note that you can call setMaxSplitSize in the initializer to control the size of each chunk of files; if you don’t want to split files into half, remember to return false in isSplitable method, which defaults to true.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.orienit.kalyan.hadoop.training.combinefiles;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

import com.orienit.kalyan.hadoop.training.combinefiles.CFRecordReader;
import com.orienit.kalyan.hadoop.training.combinefiles.FileLineWritable;

public class CFInputFormat extends CombineFileInputFormat<FileLineWritable, Text> {
  public CFInputFormat(){
    super();
    setMaxSplitSize(67108864); // 64 MB, default block size on hadoop
  }
  public RecordReader<FileLineWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException{
    return new CombineFileRecordReader<FileLineWritable, Text>((CombineFileSplit)split, context, CFRecordReader.class);
  }
  @Override
  protected boolean isSplitable(JobContext context, Path file){
    return false;
  }
}

CFRecordReader.java

CFRecordReader is a delegate class of CombineFileRecordReader, a built in class that pass each split (typically a whole file in this case) to our class CFRecordReader. When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process, and decides how many splits base on the MaxSplitSize we defined in CFInputFormat. For every split (must be a file, because we set isSplitabe to false), CombineFileRecordReader creates a CFRecrodReader instance via a custom constructor, and pass in CombineFileSplit, context, and index for CFRecordReader to locate the file to process with.
When processing the file, the CFRecordReader creates a FileLineWritable as the key for hadoop mapper class. With each line a FileLineWritable consists the file name and the offset length of that line. The difference between FileLineWritable and the normally used LongWritable in mapper is LongWritable only denote the offset of a line in a file, while FileLineWritable adds the file information into the key.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.orienit.kalyan.hadoop.training.combinefiles;

import java.io.IOException;
import com.orienit.kalyan.hadoop.training.combinefiles.FileLineWritable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;


public class CFRecordReader extends RecordReader<FileLineWritable, Text>{
  private long startOffset;
  private long end;
  private long pos;
  private FileSystem fs;
  private Path path;
  private FileLineWritable key;
  private Text value;

  private FSDataInputStream fileIn;
  private LineReader reader;

public CFRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException{
  this.path = split.getPath(index);
  fs = this.path.getFileSystem(context.getConfiguration());
  this.startOffset = split.getOffset(index);
  this.end = startOffset + split.getLength(index);

  fileIn = fs.open(path);
  reader = new LineReader(fileIn);
  this.pos = startOffset;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
    throws IOException, InterruptedException {
  // Won't be called, use custom Constructor
  // `CFRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)`
  // instead
}

@Override
public void close() throws IOException {}

@Override
public float getProgress() throws IOException{
  if (startOffset == end) {
    return 0;
  }
  return Math.min(1.0f, (pos - startOffset) / (float) (end - startOffset));
}

@Override
public FileLineWritable getCurrentKey() throws IOException, InterruptedException {
  return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
  return value;
}

@Override
public boolean nextKeyValue() throws IOException{
  if (key == null) {
    key = new FileLineWritable();
    key.fileName = path.getName();
  }
  key.offset = pos;
  if (value == null){
    value = new Text();
  }
  int newSize = 0;
  if (pos < end) {
    newSize = reader.readLine(value);
    pos += newSize;
  }
  if (newSize == 0) {
    key = null;
    value = null;
    return false;
  } else{
    return true;
  }
}
}
The reason to use a custom constructor is not documented anywhere in hadoop api nor document. You can only find it in hadoop source code, line 40:


1
2
3
4
   static final Class [] constructorSignature = new Class []
                                          {CombineFileSplit.class,
                                           TaskAttemptContext.class,
                                           Integer.class};

FileLineWritable.java

This file is very simple: store the file name and offset, and override the compareTo method to compare the file name first, then compare the offset.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.orienit.kalyan.hadoop.training.combinefiles;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class FileLineWritable implements WritableComparable<FileLineWritable>{
  public long offset;
  public String fileName;

  public void readFields(DataInput in) throws IOException {
    this.offset = in.readLong();
    this.fileName = Text.readString(in);
  }

  public void write(DataOutput out) throws IOException {
    out.writeLong(offset);
    Text.writeString(out, fileName);
  }

  public int compareTo(FileLineWritable that) {
    int cmp = this.fileName.compareTo(that.fileName);
    if (cmp != 0) return cmp;
    return (int)Math.signum((double)(this.offset - that.offset));
  }

  @Override
  public int hashCode() {               // generated hashCode()
    final int prime = 31;
    int result = 1;
    result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
    result = prime * result + (int) (offset ^ (offset >>> 32));
    return result;
  }

  @Override
  public boolean equals(Object obj) {  // generated equals()
    if (this == obj)
      return true;
    if (obj == null)
      return false;
    if (getClass() != obj.getClass())
      return false;
    FileLineWritable other = (CFFileLineWritableInputFormat) obj;
    if (fileName == null) {
      if (other.fileName != null)
        return false;
    } else if (!fileName.equals(other.fileName))
      return false;
    if (offset != other.offset)
      return false;
    return true;
  }
}

job setup

Finally is the job setup for hadoop cluster to run. We just need to assign the classes to job:


1
2
3
4
5
6
7
8
9
import org.apache.hadoop.mapreduce.Job;
// standard hadoop conf
Job job = new Job(getConf());
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(CFInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0); // map only
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.submit();


I ran several benchmarks and tuned the performance from 3 hours 34 minutes to 6 minutes 8 seconds!

Original job without any tuning

  • job_201406051010_0001
  • NumTasks: 9790
  • Reuse JVM: false
  • mean complete time: 05-Jul-2014 10:08:47 (17sec)
  • Finished in: 3hrs, 34mins, 26sec
We had 9790 files to process, and the total size of the files is 53 GB. Note that for every task it still took 17 seconds to process the file.

Using CombineFileInputFormat without setting the MaxSplitSize

  • job_201406051010_0002
  • NumTasks: 1
  • Reuse JVM: false
In this benchmark I didn’t set the MaxSplitSize in CFInputFormat.java, and thus Hadoop merge all the files into one super big task. After running this task for 15 minutes, hadoop killed it. Maybe its a timeout issue, I didn’t dig into this. The start and the end of the task logs look like this:
14/06/05 16:17:29 INFO mapred.JobClient:  map 0% reduce 0%
14/06/05 16:32:45 INFO mapred.JobClient:  map 40% reduce 0%
 
14/06/05 16:33:02 INFO mapred.JobClient: Task Id : attempt_201406051010_0002_m_000000_0, Status : FAILED
java.lang.Throwable: Child Error
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)
    Caused by: java.io.IOException: Task process exit with nonzero status of 255.
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)

Using CombineFileInputFormat with block size 64 MB

  • job_201406051010_0003
  • Reuse JVM = false
  • max split size = 64MB
  • NumTasks: 760
  • mean complete time: 05-Jul-2014 16:55:02 (24sec)
  • Finished in: 23mins, 6sec
After modifying MaxSplitSize the total runtime has reduced to 23 minutes! The total tasks drops from 9790 to 760, about 12 times smaller. The time difference is 9.3 times faster, pretty nice! However, the mean complete time doesn’t scale like other factors. The reason was it’s a big overhead to start JVM over and over again.

Using CombineFileInputFormat with block size 64MB and reuse JVM

To reuse the JVM, just set mapred.job.reuse.jvm.tasks to -1.


1
2
3
4
5
6
  public static void main(String[] argv) throws Exception {
    Configuration conf = new Configuration();
    conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
    int res = ToolRunner.run(conf, new HadoopMain(), argv);
    System.exit(res);
  }
The result is awesome! 6 minutes and 8 seconds, wow!
  • job_201406051010_0004
  • Reuse JVM = true
  • max split size = 64MB
  • NumTasks: 760
  • mean complete time: 05-Jul-2014 17:30:23 (5sec)
  • Finished in: 6mins, 8sec

Use FileInputFormat and reuse JVM

Just curious the performance difference if we only change the JVM parameter:
  • job_201406051010_0005
  • NumTasks: 9790
  • mean complete time: 05-Jul-2014 17:04:18 (3sec)
  • Reuse JVM = true
  • Finished in: 24mins, 49sec

Tuning performance over block size

Let’s jump to the conclusion first: changing the block size doesn’t affect the performance that much, and I found 64 MB is the best size to use. Here are the benchmarks:

512 MB

  • job_201406051010_0006
  • Reuse JVM = true
  • max split size = 512MB
  • NumTasks: 99
  • mean complete time: 05-Jul-2014 11:55:26 (24sec)
  • Finished in: 7min 13sec

128 MB

  • job_201406051010_0007
  • Reuse JVM = true
  • max split size = 128 MB
  • NumTasks: 341
  • mean complete time: 05-Jul-2014 13:13:20 (9sec)
  • Finished in: 6mins, 41sec

Conclusion

So far the best practice I learned from these benchmarks are:
  1. Setup the mapred.job.reuse.jvm.num.tasks flag in configuration. This is the easiest tuning to do, and it makes nearly 10 times performance improvement.
  2. Write your own CombineFileInputFormat implementation.
  3. The block size can be 64 MB or 128 MB, but doesn’t make big difference between the two.
Still, try to model your problems into sequence file or map file in hadoop. HDFS should handle localities with these files automatically. What about CFInputFormat? Does it handle locality in HDFS system too? I can’t confirm it but I guess sorting the keys based on line offset first then file name also guarantees the locality of assigning data to mapper. When I have time to dig more from HDFS API, I’ll look back to this benchmark and see what can I further tune the program.
Related Posts Plugin for WordPress, Blogger...