Showing posts with label MapReduce. Show all posts
Showing posts with label MapReduce. Show all posts

Thursday 31 July 2014

Big Data Basics - Part 5 - Introduction to MapReduce

Big Data Basics - Part 5 - Introduction to MapReduce

Problem

I have read the previous tips in the Big Data Basics series including the storage aspects (HDFS). I am curious about the computation aspect of Hadoop and want to know what it is all about, how it works, and any other relevant information.

Solution

In this tip we will take a look at the 2nd core component of Hadoop framework called MapReduce. This component is responsible for computation / data processing.

Introduction

MapReduce is basically a software programming model / software framework, which allows us to process data in parallel across multiple computers in a cluster, often running on commodity hardware, in a reliable and fault-tolerant fashion.

Key Concepts

Here are some of the key concepts related to MapReduce.

Job

A Job in the context of Hadoop MapReduce is the unit of work to be performed as requested by the client / user. The information associated with the Job includes the data to be processed (input data), MapReduce logic / program / algorithm, and any other relevant configuration information necessary to execute the Job.

Task

Hadoop MapReduce divides a Job into multiple sub-jobs known as Tasks. These tasks can be run independent of each other on various nodes across the cluster. There are primarily two types of Tasks - Map Tasks and Reduce Tasks.

JobTracker

Just like the storage (HDFS), the computation (MapReduce) also works in a master-slave / master-worker fashion. A JobTracker node acts as the Master and is responsible for scheduling / executing Tasks on appropriate nodes, coordinating the execution of tasks, sending the information for the execution of tasks, getting the results back after the execution of each task, re-executing the failed Tasks, and monitors / maintains the overall progress of the Job.  Since a Job consists of multiple Tasks, a Job's progress depends on the status / progress of Tasks associated with it. There is only one JobTracker node per Hadoop Cluster.

TaskTracker

A TaskTracker node acts as the Slave and is responsible for executing a Task assigned to it by the JobTracker. There is no restriction on the number of TaskTracker nodes that can exist in a Hadoop Cluster. TaskTracker receives the information necessary for execution of a Task from JobTracker, Executes the Task, and Sends the Results back to JobTracker.

Map()

Map Task in MapReduce is performed using the Map() function. This part of the MapReduce is responsible for processing one or more chunks of data and producing the output results.

Reduce()

The next part / component / stage of the MapReduce programming model is the Reduce() function. This part of the MapReduce is responsible for consolidating the results produced by each of the Map() functions/tasks.

Data Locality

MapReduce tries to place the data and the compute as close as possible. First, it tries to put the compute on the same node where data resides, if that cannot be done (due to reasons like compute on that node is down, compute on that node is performing some other computation, etc.), then it tries to put the compute on the node nearest to the respective data node(s) which contains the data to be processed. This feature of MapReduce is "Data Locality".

How Map Reduce Works

The following diagram shows the logical flow of a MapReduce programming model.
Hadoop - MapReduce - Logical Data Flow
Let us understand each of the stages depicted in the above diagram.
  • Input: This is the input data / file to be processed.
  • Split: Hadoop splits the incoming data into smaller pieces called "splits".
  • Map: In this step, MapReduce processes each split according to the logic defined in map() function. Each mapper works on each split at a time. Each mapper is treated as a task and multiple tasks are executed across different TaskTrackers and coordinated by the JobTracker.
  • Combine: This is an optional step and is used to improve the performance by reducing the amount of data transferred across the network. Combiner is the same as the reduce step and is used for aggregating the output of the map() function before it is passed to the subsequent steps.
  • Shuffle & Sort: In this step, outputs from all the mappers is shuffled, sorted to put them in order, and grouped before sending them to the next step.
  • Reduce: This step is used to aggregate the outputs of mappers using the reduce() function. Output of reducer is sent to the next and final step. Each reducer is treated as a task and multiple tasks are executed across different TaskTrackers and coordinated by the JobTracker.
  • Output: Finally the output of reduce step is written to a file in HDFS.

MapReduce Word Count Example

For the purpose of understanding MapReduce, let us consider a simple example. Let us assume that we have a file which contains the following four lines of text.
Hadoop - MapReduce - Word Count Example - Input File
In this file, we need to count the number of occurrences of each word. For instance, DW appears twice, BI appears once, SSRS appears twice, and so on. Let us see how this counting operation is performed when this file is input to MapReduce.
Below is a simplified representation of the data flow for Word Count Example.
Hadoop - MapReduce - Word Count Example - Data Flow
  • Input: In this step, the sample file is input to MapReduce.
  • Split: In this step, Hadoop splits / divides our sample input file into four parts, each part made up of one line from the input file. Note that, for the purpose of this example, we are considering one line as each split. However, this is not necessarily true in a real-time scenario.
  • Map: In this step, each split is fed to a mapper which is the map() function containing the logic on how to process the input data, which in our case is the line of text present in the split. For our scenario, the map() function would contain the logic to count the occurrence of each word and each occurrence is captured / arranged as a (key, value) pair, which in our case is like (SQL, 1), (DW, 1), (SQL, 1), and so on.
  • Combine: This is an optional step and is often used to improve the performance by reducing the amount of data transferred across the network. This is essentially the same as the reducer (reduce() function) and acts on output from each mapper. In our example, the key value pairs from first mapper "(SQL, 1), (DW, 1), (SQL, 1)" are combined and the output of the corresponding combiner becomes "(SQL, 2), (DW, 1)".
  • Shuffle and Sort: In this step, output of all the mappers is collected, shuffled, and sorted and arranged to be sent to reducer.
  • Reduce: In this step, the collective data from various mappers, after being shuffled and sorted, is combined / aggregated and the word counts are produced as (key, value) pairs like (BI, 1), (DW, 2), (SQL, 5), and so on.
  • Output: In this step, the output of the reducer is written to a file on HDFS. The following image is the output of our word count example.
Hadoop - MapReduce - Word Count Example - Output File

Highlights of Hadoop MapReduce

Here are few highlights of MapReduce programming model in Hadoop:
  • MapReduce works in a master-slave / master-worker fashion. JobTracker acts as the master and TaskTrackers act as the slaves.
  • MapReduce has two major phases - A Map phase and a Reduce phase. Map phase processes parts of input data using mappers based on the logic defined in the map() function. The Reduce phase aggregates the data using a reducer based on the logic defined in the reduce() function.
  • Depending upon the problem at hand, we can have One Reduce Task, Multiple Reduce Tasks or No Reduce Tasks.
  • MapReduce has built-in fault tolerance and hence can run on commodity hardware.
  • MapReduce takes care of distributing the data across various nodes, assigning the tasks to each of the nodes, getting the results back from each node, re-running the task in case of any node failures, consolidation of results, etc.
  • MapReduce processes the data in the form of (Key, Value) pairs. Hence, we need to fit out business problem in this Key-Value arrangement.

References

Next Steps

  • Explore more about Big Data and Hadoop
  • In the next and subsequent tips, we will look at the other aspects of Hadoop and the Big Data world. So stay tuned!

 

Saturday 5 July 2014

7 Tips for Improving MapReduce Performance

Since MapReduce and HDFS are complex distributed systems that run arbitrary user code, there’s no hard and fast set of rules to achieve optimal performance; instead, I tend to think of tuning a cluster or job much like a doctor would treat a sick human being. There are a number of key symptoms to look for, and each set of symptoms leads to a different diagnosis and course of treatment.

In medicine, there’s no automatic process that can replace the experience of a well seasoned doctor. The same is true with complex distributed systems — experienced users and operators often develop a “sixth sense” for common issues. Having worked with BigData customers in a number of different industries, each with a different workload, dataset, and cluster hardware, I’ve accumulated a bit of this experience, and would like to share some with you today.

In this blog post, I’ll highlight a few tips for improving MapReduce performance. The first few tips are cluster-wide, and will be useful for operators and developers alike. The latter tips are for developers writing custom MapReduce jobs in Java. For each tip, I’ll also note a few of the “symptoms” or “diagnostic tests” that indicate a particular remedy might bring you some good improvements.

Please note, also, that these tips contain lots of rules of thumb based on my experience across a variety of situations. They may not apply to your particular workload, dataset, or cluster, and you should always benchmark your jobs before and after any changes. For these tips, I’ll show some comparative numbers for a 40GB wordcount job on a small 4-node cluster. Tuned optimally, each of the map tasks in this job runs in about 33 seconds, and the total job runtime is about 8m30s.

Tip 1) Configure your cluster correctly

Diagnostics/symptoms:
  • top shows slave nodes fairly idle even when all map and reduce task slots are filled up running jobs.
  • top shows kernel processes like RAID (mdX_raid*) or pdflush taking most of the CPU time.
  • Linux load averages are often seen more than twice the number of CPUs on the system.
  • Linux load averages stay less than half the number of CPUs on the system, even when running jobs.
  • Any swap usage on nodes beyond a few MB.
The first step to optimizing your MapReduce performance is to make sure your cluster configuration has been tuned. For starters, check out our earlier blog post on configuration parameters. In addition to those knobs in the Hadoop configuration, here are a few more checklist items you should go through before beginning to tune the performance of an individual job:
  • Make sure the mounts you’re using for DFS and MapReduce storage have been mounted with the noatime option. This disables access time tracking and can improve IO performance.
  • Avoid RAID and LVM on TaskTracker and DataNode machines – it generally reduces performance.
  • Make sure you’ve configured mapred.local.dir and dfs.data.dir to point to one directory on each of your disks to ensure that all of your IO capacity is used. Run iostat -dx 5 from the sysstat package while the cluster is loaded to make sure each disk shows utilization.
  • Ensure that you have SMART monitoring for the health status of your disk drives. MapReduce jobs are fault tolerant, but dying disks can cause performance to degrade as tasks must be re-executed. If you find that a particular TaskTracker becomes blacklisted on many job invocations, it may have a failing drive.
  • Monitor and graph swap usage and network usage with software like Ganglia. Monitoring Hadoop metrics in Ganglia is also a good idea. If you see swap being used, reduce the amount of RAM allocated to each task in mapred.child.java.opts.
Benchmarks:
Unfortunately I was not able to perform benchmarks for this tip, as it would involve re-imaging the cluster. If you have had relevant experience, feel free to leave a note in the Comments section below.

Tip 2) Use LZO Compression

Diagnostics/symptoms:
  • This is almost always a good idea for intermediate data! In the doctor analogy, consider LZO compression your vitamins.
  • Output data size of MapReduce job is nontrivial.
  • Slave nodes show high iowait utilization in top and iostat when jobs are running.
Almost every Hadoop job that generates an non-negligible amount of map output will benefit from intermediate data compression with LZO. Although LZO adds a little bit of CPU overhead, the reduced amount of disk IO during the shuffle will usually save time overall.
Whenever a job needs to output a significant amount of data, LZO compression can also increase performance on the output side. Since writes are replicated 3x by default, each GB of output data you save will save 3GB of disk writes.
In order to enable LZO compression, check out our recent guest blog from Twitter. Be sure to set mapred.compress.map.output to true.
Benchmarks:
Disabling LZO compression on the wordcount example increased the job runtime only slightly on our cluster. The FILE_BYTES_WRITTEN counter increased from 3.5GB to 9.2GB, showing that the compression yielded a 62% decrease in disk IO. Since this job was not sharing the cluster, and each node has a high ratio of number of disks to number of tasks, IO is not the bottleneck here, and thus the improvement was not substantial. On clusters where disks are pegged due to a lot of concurrent activity, a 60% reduction in IO can yield a substantial improvement in job completion speed.

Tip 3) Tune the number of map and reduce tasks appropriately

Diagnostics/symptoms:
  • Each map or reduce task finishes in less than 30-40 seconds.
  • A large job does not utilize all available slots in the cluster.
  • After most mappers or reducers are scheduled, one or two remains pending and then runs all alone.
Tuning the number of map and reduce tasks for a job is important and easy to overlook. Here are some rules of thumb I use to set these parameters:
  • If each task takes less than 30-40 seconds, reduce the number of tasks. The task setup and scheduling overhead is a few seconds, so if tasks finish very quickly, you’re wasting time while not doing work. JVM reuse can also be enabled to solve this problem.
  • If a job has more than 1TB of input, consider increasing the block size of the input dataset to 256M or even 512M so that the number of tasks will be smaller. You can change the block size of existing files with a command like hadoop distcp -Ddfs.block.size=$[256*1024*1024] /path/to/inputdata /path/to/inputdata-with-largeblocks. After this command completes, you can remove the original data.
  • So long as each task runs for at least 30-40 seconds, increase the number of mapper tasks to some multiple of the number of mapper slots in the cluster. If you have 100 map slots in your cluster, try to avoid having a job with 101 mappers – the first 100 will finish at the same time, and then the 101st will have to run alone before the reducers can run. This is more important on small clusters and small jobs.
  • Don’t schedule too many reduce tasks – for most jobs, we recommend a number of reduce tasks equal to or a bit less than the number of reduce slots in the cluster.
Benchmarks:
To make the wordcount job run with too many tasks, I ran it with the argument -Dmapred.max.split.size=$[16*1024*1024]. This yielded 2640 tasks instead of the 360 that the framework chose by default. When running with this setting, each task took about 9 seconds, and watching the Cluster Summary view on the JobTracker showed the number of running maps fluctuating between 0 and 24 continuously throughout the job. The entire job finished in 17m52s, more than twice as slow as the original job.

Tip 4) Write a Combiner

Diagnostics/symptoms:
  • A job performs aggregation of some sort, and the Reduce input groups counter is significantly smaller than the Reduce input records counter.
  • The job performs a large shuffle (e.g. map output bytes is multiple GB per node)
  • The number of spilled records is many times larger than the number of map output records as seen in the Job counters.
If your algorithm involves computing aggregates of any sort, chances are you can use a Combiner in order to perform some kind of initial aggregation before the data hits the reducer. The MapReduce framework runs combiners intelligently in order to reduce the amount of data that has to be written to disk and transfered over the network in between the Map and Reduce stages of computation.
Benchmarks:
I modified the word count example to remove the call to setCombinerClass, and otherwise left it the same. This changed the average map task run time from 33s to 48s, and increased the amount of shuffled data from 1GB to 1.4GB. The total job runtime increased from 8m30s to 15m42s, nearly a factor of two. Note that this benchmark was run with map output compression enabled – without map output compression, the effect of the combiner would have been even more important.

Tip 5) Use the most appropriate and compact Writable type for your data

Symptoms/diagnostics:
  • Text objects are used for working with non-textual or complex data
  • IntWritable or LongWritable objects are used when most output values tend to be significantly smaller than the maximum value.
When users are new to programming in MapReduce, or are switching from Hadoop Streaming to Java MapReduce, they often use the Text writable type unnecessarily. Although Text can be convenient, converting numeric data to and from UTF8 strings is inefficient and can actually make up a significant portion of CPU time. Whenever dealing with non-textual data, consider using the binary Writables like IntWritable, FloatWritable, etc.
In addition to avoiding the text parsing overhead, the binary Writable types will take up less space as intermediate data. Since disk IO and network transfer will become a bottleneck in large jobs, reducing the sheer number of bytes taken up by the intermediate data can provide a substantial performance gain. When dealing with integers, it can also sometimes be faster to use VIntWritable or VLongWritable — these implement variable-length integer encoding which saves space when serializing small integers. For example, the value 4 will be serialized in a single byte, whereas the value 10000 will be serialized in two. These variable length numbers can be very effective for data like counts, where you expect that the majority of records will have a small number that fits in one or two bytes.
If the Writable types that ship with Hadoop don’t fit the bill, consider writing your own. It’s pretty simple, and will be significantly faster than parsing text. If you do so, make sure to provide a RawComparator — see the source code for the built in Writables for an example.
Along the same vein, if your MapReduce job is part of a multistage workflow, use a binary format like SequenceFile for the intermediate steps, even if the last stage needs to output text. This will reduce the amount of data that needs to be materialized along the way.
Benchmarks:
For the example word count job, I modified the intermediate count values to be Text type rather than IntWritable. In the reducer, I used Integer.parseString(value.toString()) when accumulating the sum. The performance of the suboptimal version of the WordCount was about 10% slower than the original. The full job ran in a bit over 9 minutes, and each map task took 36 seconds instead of the original 33. Since integer parsing is itself rather fast, this did not represent a large improvement; in the general case, I have seen using more efficient Writables to make as much as a 2-3x difference in performance.

Tip 6) Reuse Writables

Symptoms/diagnostics:
  • Add -verbose:gc -XX:+PrintGCDetails to mapred.child.java.opts. Then inspect the logs for some tasks. If garbage collection is frequent and represents a lot of time, you may be allocating unnecessary objects.
  • grep for “new Text” or “new IntWritable” in your code base. If you find this in an inner loop, or inside the map or reduce functions this tip may help.
  • This tip is especially helpful when your tasks are constrained in RAM.
One of the first mistakes that many MapReduce users make is to allocate a new Writable object for every output from a mapper or reducer. For example, one might implement a word-count mapper like this:
public void map(...) {
  ...
  for (String word : words) {
    output.collect(new Text(word), new IntWritable(1));
  }
}
This implementation causes thousands of very short-lived objects to be allocated. While the Java garbage collector does a reasonable job at dealing with this, it is more efficient to write:
class MyMapper ... {
  Text wordText = new Text();
  IntWritable one = new IntWritable(1);
  public void map(...) {
    ...
    for (String word : words) {
      wordText.set(word);
      output.collect(word, one);
    }
  }
}
Benchmarks:
When I modified the word count example as described above, I initially found it made no difference in the run time of the job. This is because this cluster’s default settings include a 1GB heap size for each task, so garbage collection never ran. However, running it with each task allocated only 200mb of heap size showed a drastic slowdown in the version that did not reuse Writables — the total job runtime increased from around 8m30s to over 17 minutes. The original version, which does reuse Writables, stayed the same speed even with the smaller heap. Since reusing Writables is an easy fix, I recommend always doing so – it may not bring you a gain for every job, but if you’re low on memory it can make a huge difference.

Tip 7) Use “Poor Man’s Profiling” to see what your tasks are doing

This is a trick I almost always use when first looking at the performance of a MapReduce job. Profiling purists will disagree and say that this won’t work, but you can’t argue with results!
In order to do what I call “poor man’s profiling”, ssh into one of your slave nodes while some tasks from a slow job are running. Then simply run sudo killall -QUIT java 5-10 times in a row, each a few seconds apart. Don’t worry — this doesn’t cause anything to quit, despite the name. Then, use the JobTracker interface to navigate to the stdout logs for one of the tasks that’s running on this node, or look in /var/log/hadoop/userlogs/ for a stdout file of a task that is currently running. You’ll see stack trace output from each time you sent the SIGQUIT signal to the JVM.
It takes a bit of experience to parse this output, but here’s the method I usually use:
  1. For each thread in the trace, quickly scan for the name of your Java package (e.g. com.mycompany.mrjobs). If you don’t see any lines in the trace that are part of your code, skip over this thread.
  2. When you find a stack trace that has some of your code in it, make a quick mental note what it’s doing. For example, “something NumberFormat-related” is all you need at this point. Don’t worry about specific line numbers yet.
  3. Go down to the next dump you took a few seconds later in the logs. Perform the same process here and make a note.
  4. After you’ve gone through 4-5 of the traces, you might notice that the same vague thing shows up in every one of them. If that thing is something that you expect to be fast, you probably found your culprit. If you take 10 traces, and 5 of them show NumberFormat in the dump, it means that you’re spending somewhere around 50% of your CPU time formatting numbers, and you might consider doing something differently.
Sure, this method isn’t as scientific as using a real profiler on your tasks, but I’ve found that it’s a surefire way to notice any glaring CPU bottlenecks very quickly and with no setup involved. It’s also a technique that you’ll get better at with practice as you learn what a normal dump looks like and when something jumps out as odd.
Here are a few performance mistakes I often find through this technique:
  • NumberFormat is slow – avoid it where possible.
  • String.split, as well as encoding or decoding UTF8 are slower than you think – see above tips about using the appropriate Writables
  • Concatenating Strings rather than using StringBuffer.append

These are just a few tips for improving MapReduce performance. If you have your own tips and tricks for profiling and optimizing MapReduce jobs, please leave a comment below! If you’d like to look at the code I used for running the benchmarks, I’ve put it online at http://github.com/toddlipcon/performance-blog-code/

Appendix: Benchmark Cluster Setup
Each node in the cluster is a dual quad-core Nehalem box with hyperthreading enabled, 24G of RAM and 12x1TB disks. The TaskTrackers are configured with 6 map and 6 reduce slots, slightly lower than we normally recommend since we sometimes run multiple clusters at once on these boxes for testing.

Process Small Files on Hadoop Using CombineFileInputFormat

From Cloudera’s blog:
A small file is one which is significantly smaller than the HDFS block size (default 64MB). If you’re storing small files, then you probably have lots of them (otherwise you wouldn’t turn to Hadoop), and the problem is that HDFS can’t handle lots of files.
In my benchmark, just using a custom CombineFileInputFormat can speedup the program from 3 hours to 23 minutes, and after some further tuning, the same task can be run in 6 minutes!

Benchmark Setup

To test the raw performance of different approaches to solve small problems, I setup a map only hadoop job that basically just do grep and perform a small binary search. The binary search part is to generate the reduce side keys that I’ll use in further data processing; it took only a little resource (8MB index) to run, so it does not affect the result of the benchmark.
The data to process is some server log data, 53.1 GB in total. The hadoop clusters consist 6 nodes, using hadoop version 1.1.2. In this benchmark I implemented CombineFileInputFormat to shrink the map jobs; I also tested the difference of reusing JVM or not, and different number of block sizes to combine files.

CombineFileInputFormat

The code listed here is modified from Hadoop example code. To use CombineFileInputFormat you need to implement three classes. The class CombineFileInputFormat is an abstract class with no implementation, so you must create a subclass to support it; we’ll name the subclass CFInputFormat. The subclass will initiate a delegate CFRecordReader that extends RecordReader; this is the code that does the file processing logic. We’ll also need a class for FileLineWritable, which replaces LongWritable normally used as a key to file lines.

CFInputFormat.java

The CFInputFormat.java doesn’t do much. You implement createRecordReader to pass in the record reader that does the combine file logic, that’s all. Note that you can call setMaxSplitSize in the initializer to control the size of each chunk of files; if you don’t want to split files into half, remember to return false in isSplitable method, which defaults to true.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.orienit.kalyan.hadoop.training.combinefiles;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

import com.orienit.kalyan.hadoop.training.combinefiles.CFRecordReader;
import com.orienit.kalyan.hadoop.training.combinefiles.FileLineWritable;

public class CFInputFormat extends CombineFileInputFormat<FileLineWritable, Text> {
  public CFInputFormat(){
    super();
    setMaxSplitSize(67108864); // 64 MB, default block size on hadoop
  }
  public RecordReader<FileLineWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException{
    return new CombineFileRecordReader<FileLineWritable, Text>((CombineFileSplit)split, context, CFRecordReader.class);
  }
  @Override
  protected boolean isSplitable(JobContext context, Path file){
    return false;
  }
}

CFRecordReader.java

CFRecordReader is a delegate class of CombineFileRecordReader, a built in class that pass each split (typically a whole file in this case) to our class CFRecordReader. When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process, and decides how many splits base on the MaxSplitSize we defined in CFInputFormat. For every split (must be a file, because we set isSplitabe to false), CombineFileRecordReader creates a CFRecrodReader instance via a custom constructor, and pass in CombineFileSplit, context, and index for CFRecordReader to locate the file to process with.
When processing the file, the CFRecordReader creates a FileLineWritable as the key for hadoop mapper class. With each line a FileLineWritable consists the file name and the offset length of that line. The difference between FileLineWritable and the normally used LongWritable in mapper is LongWritable only denote the offset of a line in a file, while FileLineWritable adds the file information into the key.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.orienit.kalyan.hadoop.training.combinefiles;

import java.io.IOException;
import com.orienit.kalyan.hadoop.training.combinefiles.FileLineWritable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;


public class CFRecordReader extends RecordReader<FileLineWritable, Text>{
  private long startOffset;
  private long end;
  private long pos;
  private FileSystem fs;
  private Path path;
  private FileLineWritable key;
  private Text value;

  private FSDataInputStream fileIn;
  private LineReader reader;

public CFRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException{
  this.path = split.getPath(index);
  fs = this.path.getFileSystem(context.getConfiguration());
  this.startOffset = split.getOffset(index);
  this.end = startOffset + split.getLength(index);

  fileIn = fs.open(path);
  reader = new LineReader(fileIn);
  this.pos = startOffset;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
    throws IOException, InterruptedException {
  // Won't be called, use custom Constructor
  // `CFRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)`
  // instead
}

@Override
public void close() throws IOException {}

@Override
public float getProgress() throws IOException{
  if (startOffset == end) {
    return 0;
  }
  return Math.min(1.0f, (pos - startOffset) / (float) (end - startOffset));
}

@Override
public FileLineWritable getCurrentKey() throws IOException, InterruptedException {
  return key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
  return value;
}

@Override
public boolean nextKeyValue() throws IOException{
  if (key == null) {
    key = new FileLineWritable();
    key.fileName = path.getName();
  }
  key.offset = pos;
  if (value == null){
    value = new Text();
  }
  int newSize = 0;
  if (pos < end) {
    newSize = reader.readLine(value);
    pos += newSize;
  }
  if (newSize == 0) {
    key = null;
    value = null;
    return false;
  } else{
    return true;
  }
}
}
The reason to use a custom constructor is not documented anywhere in hadoop api nor document. You can only find it in hadoop source code, line 40:


1
2
3
4
   static final Class [] constructorSignature = new Class []
                                          {CombineFileSplit.class,
                                           TaskAttemptContext.class,
                                           Integer.class};

FileLineWritable.java

This file is very simple: store the file name and offset, and override the compareTo method to compare the file name first, then compare the offset.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.orienit.kalyan.hadoop.training.combinefiles;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class FileLineWritable implements WritableComparable<FileLineWritable>{
  public long offset;
  public String fileName;

  public void readFields(DataInput in) throws IOException {
    this.offset = in.readLong();
    this.fileName = Text.readString(in);
  }

  public void write(DataOutput out) throws IOException {
    out.writeLong(offset);
    Text.writeString(out, fileName);
  }

  public int compareTo(FileLineWritable that) {
    int cmp = this.fileName.compareTo(that.fileName);
    if (cmp != 0) return cmp;
    return (int)Math.signum((double)(this.offset - that.offset));
  }

  @Override
  public int hashCode() {               // generated hashCode()
    final int prime = 31;
    int result = 1;
    result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
    result = prime * result + (int) (offset ^ (offset >>> 32));
    return result;
  }

  @Override
  public boolean equals(Object obj) {  // generated equals()
    if (this == obj)
      return true;
    if (obj == null)
      return false;
    if (getClass() != obj.getClass())
      return false;
    FileLineWritable other = (CFFileLineWritableInputFormat) obj;
    if (fileName == null) {
      if (other.fileName != null)
        return false;
    } else if (!fileName.equals(other.fileName))
      return false;
    if (offset != other.offset)
      return false;
    return true;
  }
}

job setup

Finally is the job setup for hadoop cluster to run. We just need to assign the classes to job:


1
2
3
4
5
6
7
8
9
import org.apache.hadoop.mapreduce.Job;
// standard hadoop conf
Job job = new Job(getConf());
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setInputFormatClass(CFInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0); // map only
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.submit();


I ran several benchmarks and tuned the performance from 3 hours 34 minutes to 6 minutes 8 seconds!

Original job without any tuning

  • job_201406051010_0001
  • NumTasks: 9790
  • Reuse JVM: false
  • mean complete time: 05-Jul-2014 10:08:47 (17sec)
  • Finished in: 3hrs, 34mins, 26sec
We had 9790 files to process, and the total size of the files is 53 GB. Note that for every task it still took 17 seconds to process the file.

Using CombineFileInputFormat without setting the MaxSplitSize

  • job_201406051010_0002
  • NumTasks: 1
  • Reuse JVM: false
In this benchmark I didn’t set the MaxSplitSize in CFInputFormat.java, and thus Hadoop merge all the files into one super big task. After running this task for 15 minutes, hadoop killed it. Maybe its a timeout issue, I didn’t dig into this. The start and the end of the task logs look like this:
14/06/05 16:17:29 INFO mapred.JobClient:  map 0% reduce 0%
14/06/05 16:32:45 INFO mapred.JobClient:  map 40% reduce 0%
 
14/06/05 16:33:02 INFO mapred.JobClient: Task Id : attempt_201406051010_0002_m_000000_0, Status : FAILED
java.lang.Throwable: Child Error
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:271)
    Caused by: java.io.IOException: Task process exit with nonzero status of 255.
    at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:258)

Using CombineFileInputFormat with block size 64 MB

  • job_201406051010_0003
  • Reuse JVM = false
  • max split size = 64MB
  • NumTasks: 760
  • mean complete time: 05-Jul-2014 16:55:02 (24sec)
  • Finished in: 23mins, 6sec
After modifying MaxSplitSize the total runtime has reduced to 23 minutes! The total tasks drops from 9790 to 760, about 12 times smaller. The time difference is 9.3 times faster, pretty nice! However, the mean complete time doesn’t scale like other factors. The reason was it’s a big overhead to start JVM over and over again.

Using CombineFileInputFormat with block size 64MB and reuse JVM

To reuse the JVM, just set mapred.job.reuse.jvm.tasks to -1.


1
2
3
4
5
6
  public static void main(String[] argv) throws Exception {
    Configuration conf = new Configuration();
    conf.setInt("mapred.job.reuse.jvm.num.tasks", -1);
    int res = ToolRunner.run(conf, new HadoopMain(), argv);
    System.exit(res);
  }
The result is awesome! 6 minutes and 8 seconds, wow!
  • job_201406051010_0004
  • Reuse JVM = true
  • max split size = 64MB
  • NumTasks: 760
  • mean complete time: 05-Jul-2014 17:30:23 (5sec)
  • Finished in: 6mins, 8sec

Use FileInputFormat and reuse JVM

Just curious the performance difference if we only change the JVM parameter:
  • job_201406051010_0005
  • NumTasks: 9790
  • mean complete time: 05-Jul-2014 17:04:18 (3sec)
  • Reuse JVM = true
  • Finished in: 24mins, 49sec

Tuning performance over block size

Let’s jump to the conclusion first: changing the block size doesn’t affect the performance that much, and I found 64 MB is the best size to use. Here are the benchmarks:

512 MB

  • job_201406051010_0006
  • Reuse JVM = true
  • max split size = 512MB
  • NumTasks: 99
  • mean complete time: 05-Jul-2014 11:55:26 (24sec)
  • Finished in: 7min 13sec

128 MB

  • job_201406051010_0007
  • Reuse JVM = true
  • max split size = 128 MB
  • NumTasks: 341
  • mean complete time: 05-Jul-2014 13:13:20 (9sec)
  • Finished in: 6mins, 41sec

Conclusion

So far the best practice I learned from these benchmarks are:
  1. Setup the mapred.job.reuse.jvm.num.tasks flag in configuration. This is the easiest tuning to do, and it makes nearly 10 times performance improvement.
  2. Write your own CombineFileInputFormat implementation.
  3. The block size can be 64 MB or 128 MB, but doesn’t make big difference between the two.
Still, try to model your problems into sequence file or map file in hadoop. HDFS should handle localities with these files automatically. What about CFInputFormat? Does it handle locality in HDFS system too? I can’t confirm it but I guess sorting the keys based on line offset first then file name also guarantees the locality of assigning data to mapper. When I have time to dig more from HDFS API, I’ll look back to this benchmark and see what can I further tune the program.

Wednesday 18 June 2014

Map Reduce Scheduling information

JobTracker and TaskTracker: the MapReduce engine

Hadoop distributed file systems comes the MapReduce engine, which consists of one JobTracker, to which client applications submit MapReduce jobs. The JobTracker pushes work out to available TaskTracker nodes in the cluster, striving to keep the work as close to the data as possible. With a rack-aware file system, the JobTracker knows which node contains the data, and which other machines are nearby. If the work cannot be hosted on the actual node where the data resides, priority is given to nodes in the same rack. This reduces network traffic on the main backbone network. If a TaskTracker fails or times out, that part of the job is rescheduled. The TaskTracker on each node spawns off a separate Java Virtual Machine process to prevent the TaskTracker itself from failing if the running job crashes the JVM. A heartbeat is sent from the TaskTracker to the JobTracker every few minutes to check its status. The Job Tracker and TaskTracker status and information is exposed by Jetty and can be viewed from a web browser.
If the JobTracker failed on Hadoop 0.20 or earlier, all ongoing work was lost. Hadoop version 0.21 added some checkpointing to this process; the JobTracker records what it is up to in the file system. When a JobTracker starts up, it looks for any such data, so that it can restart work from where it left off.
Known limitations of this approach are:
  • The allocation of work to TaskTrackers is very simple. Every TaskTracker has a number of available slots (such as "4 slots"). Every active map or reduce task takes up one slot. The Job Tracker allocates work to the tracker nearest to the data with an available slot. There is no consideration of the current system load of the allocated machine, and hence its actual availability.
  • If one TaskTracker is very slow, it can delay the entire MapReduce job – especially towards the end of a job, where everything can end up waiting for the slowest task. With speculative execution enabled, however, a single task can be executed on multiple slave nodes.

Scheduling

By default Hadoop uses FIFO, and optional 5 scheduling priorities to schedule jobs from a work queue. In version 0.19 the job scheduler was refactored out of the JobTracker, and added the ability to use an alternate scheduler (such as the Fair scheduler or the Capacity scheduler).
Fair scheduler
The fair scheduler was developed by Facebook. The goal of the fair scheduler is to provide fast response times for small jobs and QoS for production jobs. The fair scheduler has three basic concepts.
  1. Jobs are grouped into Pools.
  2. Each pool is assigned a guaranteed minimum share.
  3. Excess capacity is split between jobs.
By default, jobs that are uncategorized go into a default pool. Pools have to specify the minimum number of map slots, reduce slots, and a limit on the number of running jobs.
Capacity scheduler
The capacity scheduler was developed by Yahoo. The capacity scheduler supports several features that are similar to the fair scheduler.
  • Jobs are submitted into queues.
  • Queues are allocated a fraction of the total resource capacity.
  • Free resources are allocated to queues beyond their total capacity.
  • Within a queue a job with a high level of priority has access to the queue's resources.
There is no preemption once a job is running.

Other applications

The HDFS file system is not restricted to MapReduce jobs. It can be used for other applications, many of which are under development at Apache. The list includes theHBase database, the Apache Mahout machine learning system, and the Apache Hive Data Warehouse system. Hadoop can in theory be used for any sort of work that is batch-oriented rather than real-time, that is very data-intensive, and able to work on pieces of the data in parallel. As of October 2009, commercial applications of Hadoop included:
  • Log and/or clickstream analysis of various kinds
  • Marketing analytics
  • Machine learning and/or sophisticated data mining
  • Image processing
  • Processing of XML messages
  • Web crawling and/or text processing
  • General archiving, including of relational/tabular data, e.g. for compliance
Related Posts Plugin for WordPress, Blogger...