Monday, 20 October 2014

MapReduce Code Testing

MR code testing using MRunit


MR framework provides the MRunit testing for test your MR code. You can test your code in
local environment and then can run it on cluster.


import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.mrunit.*;
import org.apache.hadoop.mrunit.types.Pair;
import com.google.common.collect.ImmutableList;

public class MRJobTest {

  private MapDriver<LongWritable, Text, Text, Text> mapDriver;
  private ReduceDriver<Text, Text, Text, Text> reduceDriver;
  private MapReduceDriver<LongWritable, Text, Text, Text, Text, Text> mapReduceDriver;

public class InvertedIndexMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
  public static final int RETAIlER_INDEX = 0;

  @Override
  public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
     final String[] record = StringUtils.split(text.toString(), ",");
     final String retailer = record[RETAIlER_INDEX];
     for (int i = 1; i < record.length; i++) {
        final String keyword = record[i];
        outputCollector.collect(new Text(keyword), new Text(retailer));
     }
   }
  }

 public class InvertedIndexReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {

  @Override
  public void reduce(Text text, Iterator<Text> textIterator, OutputCollector<Text, Text> outputCollector, Reporter reporter)
   throws IOException {
  // TODO Auto-generated method stub
    final String retailers = StringUtils.join(textIterator, ',');
    outputCollector.collect(text, new Text(retailers));
 }
}

@Before
public void setUp() throws Exception {

  final InvertedIndexMapper mapper = new InvertedIndexMapper();
  final InvertedIndexReducer reducer = new InvertedIndexReducer();

  mapDriver = MapDriver.newMapDriver(mapper);
  reduceDriver = ReduceDriver.newReduceDriver(reducer);
  mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
}

@Test
public void testMapperWithSingleKeyAndValue() throws Exception {
  final LongWritable inputKey = new LongWritable(0);
  final Text inputValue = new Text("www.kroger.com,groceries,clothes");

  final Text outputKey = new Text("groceries");
  final Text outputValue = new Text("www.kroger.com");

  mapDriver.withInput(inputKey, inputValue);
  mapDriver.withOutput(outputKey, outputValue);
  mapDriver.runTest();

 }

@Test
public void testMapperWithSingleInputAndMultipleOutput() throws Exception {
  final LongWritable key = new LongWritable(0);
 mapDriver.withInput(key, new Text("www.amazon.com,books,music,toys,ebooks,movies,computers"));
  final List<Pair<Text, Text>> result = mapDriver.run();

  final Pair<Text, Text> books = new Pair<Text, Text>(new Text("books"), new Text("www.amazon.com"));
  final Pair<Text, Text> toys = new Pair<Text, Text>(new Text("toys"), new Text("www.amazon.com"));

 assertThat(result)
  .isNotNull()
  .hasSize(6)
  .contains(books, toys);
}

@Test
public void testReducer() throws Exception {
 final Text inputKey = new Text("books");
 final ImmutableList<Text> inputValue = ImmutableList.of(new Text("www.amazon.com"), new Text("www.ebay.com"));

 reduceDriver.withInput(inputKey,inputValue);
 final List<Pair<Text, Text>> result = reduceDriver.run();
 //final Pair<Text, Text> pair2 = new Pair<Text, Text>(inputKey, new Text("www.amazon.com,www.ebay.com"));
  reduceDriver.withOutput(inputKey, new Text("www.amazon.com,www.ebay.com"));
 /* assertThat(result)
  .isNotNull()
  .hasSize(1)
  .containsExactly(pair2); */
  reduceDriver.runTest();
  }

 }
Note: Add the mrunit jar and dependent jar.

Apache Web Log Analysis using PIG

Apache Web Log Analysis using PIG

Enter into the Pig shell. using the 'pig -x local'

Load the log file into Pig using the LOAD command.

grunt>raw_logs = LOAD '/home/hadoop/work/input/apacheLog.log' 
           USING TextLoader AS (line:chararray);

Parse the log file and assign different field to different varriable.

logs_base = FOREACH raw_logs GENERATE FLATTEN (REGEX_EXTRACT_ALL(line,'^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] "(.+?)" (\\S+) (\\S+) "([^"]*)" "([^"]*)"') )
AS (remoteAddr: chararray, remoteLogname: chararray, user: chararray,  time: chararray, 
request: chararray, status: int, bytes_string: chararray, eferrer: chararray, browser: chararray);

We need only time (time), IP Address (remoteAddr), and user (remoteLogname). So we extract 
these three variables for each record and assign them to a placeholder.

logs =  FOREACH logs_base GENERATE remoteAddr,remoteLogname, time;

Now we need to find out the number of hits and number of unique users based on time.
We can achieve this in Pig by grouping all the records based on some variable 
or combination of variables. In our case, it would be datetime.

group_time = GROUP logs BY (time);

In this grouping, we need to find out the count of number of hits and number of unique users.
In order to find out the number of hits, we simply take count of the number of IP addresses
 in a given year using the COUNT.

Putting it all together, we can find out the number of hits and number of unique users
(but in our case it will come 1 because name of user is '-') for each time using this statement.

X = FOREACH group_time { 
            unique_users = DISTINCT logs.remoteLogname;
            GENERATE FLATTEN(group), COUNT(unique_users) AS UniqueUsers,
            COUNT(logs) as counts;
       }


(Results are in the form of Time, Unique Users, No. of Hits)


Related Posts Plugin for WordPress, Blogger...