MR framework provides the MRunit testing for test your MR code. You can test your code in
local environment and then can run it on cluster.
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.mrunit.*;
import org.apache.hadoop.mrunit.types.Pair;
import com.google.common.collect.ImmutableList;
public class MRJobTest {
private MapDriver<LongWritable, Text, Text, Text> mapDriver;
private ReduceDriver<Text, Text, Text, Text> reduceDriver;
private MapReduceDriver<LongWritable, Text, Text, Text, Text, Text> mapReduceDriver;
public class InvertedIndexMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public static final int RETAIlER_INDEX = 0;
@Override
public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
final String[] record = StringUtils.split(text.toString(), ",");
final String retailer = record[RETAIlER_INDEX];
for (int i = 1; i < record.length; i++) {
final String keyword = record[i];
outputCollector.collect(new Text(keyword), new Text(retailer));
}
}
}
public class InvertedIndexReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text text, Iterator<Text> textIterator, OutputCollector<Text, Text> outputCollector, Reporter reporter)
throws IOException {
// TODO Auto-generated method stub
final String retailers = StringUtils.join(textIterator, ',');
outputCollector.collect(text, new Text(retailers));
}
}
@Before
public void setUp() throws Exception {
final InvertedIndexMapper mapper = new InvertedIndexMapper();
final InvertedIndexReducer reducer = new InvertedIndexReducer();
mapDriver = MapDriver.newMapDriver(mapper);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
}
@Test
public void testMapperWithSingleKeyAndValue() throws Exception {
final LongWritable inputKey = new LongWritable(0);
final Text inputValue = new Text("www.kroger.com,groceries,clothes");
final Text outputKey = new Text("groceries");
final Text outputValue = new Text("www.kroger.com");
mapDriver.withInput(inputKey, inputValue);
mapDriver.withOutput(outputKey, outputValue);
mapDriver.runTest();
}
@Test
public void testMapperWithSingleInputAndMultipleOutput() throws Exception {
final LongWritable key = new LongWritable(0);
mapDriver.withInput(key, new Text("www.amazon.com,books,music,toys,ebooks,movies,computers"));
final List<Pair<Text, Text>> result = mapDriver.run();
final Pair<Text, Text> books = new Pair<Text, Text>(new Text("books"), new Text("www.amazon.com"));
final Pair<Text, Text> toys = new Pair<Text, Text>(new Text("toys"), new Text("www.amazon.com"));
assertThat(result)
.isNotNull()
.hasSize(6)
.contains(books, toys);
}
@Test
public void testReducer() throws Exception {
final Text inputKey = new Text("books");
final ImmutableList<Text> inputValue = ImmutableList.of(new Text("www.amazon.com"), new Text("www.ebay.com"));
reduceDriver.withInput(inputKey,inputValue);
final List<Pair<Text, Text>> result = reduceDriver.run();
//final Pair<Text, Text> pair2 = new Pair<Text, Text>(inputKey, new Text("www.amazon.com,www.ebay.com"));
reduceDriver.withOutput(inputKey, new Text("www.amazon.com,www.ebay.com"));
/* assertThat(result)
.isNotNull()
.hasSize(1)
.containsExactly(pair2); */
reduceDriver.runTest();
}
}
Note: Add the mrunit jar and dependent jar.