Wednesday 20 November 2013

How to change the default key-value seperator of a mapreduce job

TextOutputFormat:
============================== 
The default MapReduce output format, TextOutputFormat, writes records as lines of text. Its keys and values may be of any type, since TextOutputFormat turns them to strings by calling toString() on them.

Each key-value pair is separated by a tab character. We can change this separator to some character of our choice using the mapred.textoutputformat.separator propertty.

To do this you have to add this line in your job function: 

Configuration conf = new Configuration();

// by default \t
conf.set("mapred.textoutputformat.separator", "\t"); 

// to make ,
conf.set("mapred.textoutputformat.separator", ","); 

// to make :
conf.set("mapred.textoutputformat.separator", ":");  

  

KeyValueTextInputFormat
=================================
 
TextInputFormat’s keys, being simply the offset within the file, are not normally very useful. It is common for each line in a file to be a key-value pair, separated by a delimiter such as a tab character. 

For example, this is the output produced by TextOutputFormat, Hadoop’s default OutputFormat. To interpret such files correctly, KeyValueTextInputFormat is appropriate.

You can specify the separator via the mapreduce.input.keyvaluelinerecordreader.key.value.separator property (In the older MapReduce API this was
key.value.separator.in.input.line). It is a tab character by default. 


Consider the following input file, where → represents a (horizontal) tab character:

line1→On the top of the Crumpetty Tree
line2→The Quangle Wangle sat,
line3→But his face you could not see,
line4→On account of his Beaver Hat.


Configuration conf = new Configuration();

// by default \t
 conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");  

// to make →
 conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "→");   

// to make :
 conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ":");   

  
NLineInputFormat
=============================
With TextInputFormat and KeyValueTextInputFormat, each mapper receives a variable number of lines of input. The number depends on the size of the split and the length of the lines. If you want your mappers to receive a fixed number of lines of input, then NLineInputFormat is the InputFormat to use. Like TextInputFormat, the keys are the byte offsets within the file and the values are the lines themselves.

N refers to the number of lines of input that each mapper receives. With N set to one (the default), each mapper receives exactly one line of input. The mapreduce.input.lineinputformat.linespermap property
(In the older MapReduce API this was mapred.line.input.format.linespermap) controls the value of N.

By way of example, consider these four lines again:

On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.


If, for example, N is two, then each split contains two lines. One mapper will receive the first two key-value pairs:

(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
And another mapper will receive the second two key-value pairs:

(57, But his face you could not see,)
(89, On account of his Beaver Hat.)
The keys and values are the same as TextInputFormat produces. What is different is the way the splits are constructed.


Usually, having a map task for a small number of lines of input is inefficient (due to the overhead in task setup), but there are applications that take a small amount of input data and run an extensive (that is, CPU-intensive) computation for it, then emit their output. Simulations are a good example. By creating an input file that specifies input parameters, one per line, you can perform a parameter sweep: run a set of simulations in parallel to find how a model varies as the parameter changes.
 
 

Binary Input
==========================
Hadoop MapReduce is not just restricted to processing textual data—it has support for binary formats, too.

SequenceFileInputFormat

==========================
Hadoop’s sequence file format stores sequences of binary key-value pairs. They are well suited as a format for MapReduce data since they are splittable (they have sync points so that readers can synchronize with record boundaries from an arbitrary point in the file, such as the start of a split), they support compression as a part of the format, and they can store arbitrary types using a variety of serialization frameworks.

To use data from sequence files as the input to MapReduce, you use SequenceFileInputFormat. The keys and values are determined by the sequence file, and you need to make sure that your map input types correspond. 


For example, if your sequence file has IntWritable keys and Text values, then the map signature would be Mapper<IntWritable, Text, K, V>, where K and V are the types of the map’s output keys and values.

Note
Although its name doesn’t give it away, SequenceFileInputFormat can read MapFiles as well as sequence files. If it finds a directory where it was expecting a sequence file, SequenceFileInputFormat assumes that it is reading a MapFile and uses its data file. This is why there is no MapFileInputFormat class.


SequenceFileAsTextInputFormat

=======================================
SequenceFileAsTextInputFormat is a variant of SequenceFileInputFormat that converts the sequence file’s keys and values to Text objects. The conversion is performed by calling toString() on the keys and values. This format makes sequence files suitable input for Streaming.

SequenceFileAsBinaryInputFormat

=========================================
SequenceFileAsBinaryInputFormat is a variant of SequenceFileInputFormat that retrieves the sequence file’s keys and values as opaque binary objects. They are encapsulated as BytesWritable objects, and the application is free to interpret the underlying byte array as it pleases. Combined with SequenceFile.Reader’s appendRaw() method, this provides a way to use any binary data types with MapReduce (packaged as a sequence file), although plugging into Hadoop’s serialization mechanism is normally a cleaner alternative.

Multiple Inputs

===========================
Although the input to a MapReduce job may consist of multiple input files (constructed by a combination of file globs, filters, and plain paths), all of the input is interpreted by a single InputFormat and a single Mapper. What often happens, however, is that over time, the data format evolves, so you have to write your mapper to cope with all of your legacy formats. Or, you have data sources that provide the same type of data but in different formats. This arises in the case of performing joins of different datasets; see Reduce-Side Joins. For instance, one might be tab-separated plain text, the other a binary sequence file. Even if they are in the same format, they may have different representations and, therefore, need to be parsed differently.

These cases are handled elegantly by using the MultipleInputs class, which allows you to specify the InputFormat and Mapper to use on a per-path basis. For example, if we had weather data from the UK Met Office that we wanted to combine with the NCDC data for our maximum temperature analysis, then we might set up the input as follows:

MultipleInputs.addInputPath(conf, ncdcInputPath, TextInputFormat.class, MaxTemperatureMapper.class)
MultipleInputs.addInputPath(conf, metOfficeInputPath, TextInputFormat.class, MetOfficeMaxTemperatureMapper.class);


This code replaces the usual calls to FileInputFormat.addInputPath() and conf.setMapperClass(). Both Met Office and NCDC data is text-based, so we use TextInputFormat for each. But the line format of the two data sources is different, so we use two different mappers. The MaxTemperatureMapper reads NCDC input data and extracts the year and temperature fields. The MetOfficeMaxTemperatureMapper reads Met Office input data and extracts the year and temperature fields. The important thing is that the map outputs have the same types, since the reducers (which are all of the same type) see the aggregated map outputs and are not aware of the different mappers used to produce them.

The MultipleInputs class has an overloaded version of addInputPath() that doesn’t take a mapper:

public static void addInputPath(JobConf conf, Path path, Class<? extends InputFormat> inputFormatClass)

 
This is useful when you only have one mapper but multiple input formats.






 

Wednesday 23 October 2013

Anatomy of a MapReduce Job

Anatomy of a MapReduce Job

Hadoop MapReduce jobs are divided into a set of map tasks and reduce tasks that run in a distributed fashion on a cluster of computers. Each task work on a small subset of the data it has been assigned so that the load is spread across the cluster.
The input to a MapReduce job is a set of files in the data store that are spread out over the HDFS. In Hadoop, these files are split with an input formatwhich defines how to separate a files into input split. You can assume that input split is a byte-oriented view of a chunk of the files to be loaded by a map task.
The map task generally performs loading, parsing, transformation and filtering operations, whereas reduce task is responsible for grouping and aggregating the data produced by map tasks to generate final output. This is the way a wide range of problems can be solved with such a straightforward paradigm, from simple numerical aggregation to complex join operations and cartesian  products.
Each map task in Hadoop is broken into following phases: record reader, mapper, combiner, partitioner. The output of map phase, called intermediate key and values are sent to the reducers. The reduce tasks are broken into following phases: shuffle, sort, reducer and output format. The map tasks are assigned by Hadoop framework to those DataNodes where the actual data to be processed resides. This ensures that the data typically doesn’t have to move over the network  to save the network bandwidth and data is computed on the local machine itself so called map task is data local.
MapReduce Framework
http://highlyscalable.wordpress.com/2012/02/01/mapreduce-patterns/

Mapper

Record Reader:

The record reader translates an input split generated by input format into records. The purpose of record reader is to parse the data into record but doesn’t parse the record itself. It passes the data to the mapper in form of key/value pair. Usually the key in this context is positional information and the value is a chunk of data that composes a record. In our future articles we will discuss more about NLineInputFormat and custom record readers.

Map:

Map function is the heart of mapper task, which is executed on each key/value pair from the record reader to produce zero or more key/value pair, called intermediate pairs. The decision of what is key/value pair depends on what the MapReduce job is accomplishing. The data is grouped on key and the value is the information pertinent to the analysis in the reducer.

Combiner:

Its an optional component but highly useful and provides extreme performance gain of MapReduce job without any downside. Combiner is not applicable to all the MapReduce algorithms but where ever it can be applied it is always recommended to use. It takes the intermediate keys from the mapper and applies a user-provided method to aggregate values in a small scope of that one mapper. e.g sending (hadoop, 3) requires fewer bytes than sending (hadoop, 1) three times over the network. We will cover combiner in much more depth in our future articles.

Partitioner:

The partitioner takes the intermediate key/value pairs from mapper and split them into shards, one shard per reducer. This randomly distributes  the keyspace evenly over the reducer, but still ensures that keys with the same value in different mappers end up at the same reducer. The partitioned data is written to the local filesystem for each map task and waits to be pulled by its respective reducer.

Reducer

Shuffle and Sort:

The reduce task start with the shuffle and sort step. This step takes the output files written by all of the partitioners and downloads them to the local machine in which the reducer is running. These individual data pipes are then sorted by keys into one larger data list. The purpose of this sort is to group equivalent keys together so that their values can be iterated over easily in the reduce task.

Reduce:

The reducer takes the grouped data as input and runs a reduce function once per key grouping. The function is passed the key and an iterator over all the values associated with that key. A wide range of processing can happen in this function, the data can be aggregated, filtered, and combined in a number of ways. Once it is done, it sends zero or more key/value pair to the final step, the output format.

Output Format:

The output format translate the final key/value pair from the reduce function and writes it out to a file by a record writer. By default, it will separate the key and value with a tab and separate record with a new line character. We will discuss in our future articles about how to write your own customized output format.

 

Related Posts Plugin for WordPress, Blogger...