Monday, 20 October 2014

Run the Chain MapReduce Job

Run the Chain MapReduce Job

Sometimes we need to run the depended multiple Map Reduce job(Map---->Reduce---->Map).
Here's the one example of ChainMapReduce Job.


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.*;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;

import com.orienit.hadoop.training.Dictionary;
import com.orienit.hadoop.training.SgmParser;

public class ChainWordCountDriver extends Configured implements Tool { 

// TokenizerMapper  -  Parse the input file record for every token
public static class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
    private final IntWritable one = new IntWritable(1);
    private Text word = new Text();
//here sgm parser is responsible for removing the stop words.
public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
        String line = value.toString();
 line = SgmParser.parse(line);
 line = line.replaceAll("\\s+", " ").trim();
 StringTokenizer tokenizer = new StringTokenizer(line);
 while (tokenizer.hasMoreTokens()) {
  output.collect(new Text(tokenizer.nextToken()), one);
        }
    }
}

//LowerCaserMapper - It will lowercase the passed token from TokenizerMapper
 public class LowerCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {

    public void map(Text key, IntWritable value,OutputCollector<Text, IntWritable> output,Reporter reporter) throws IOException {
        String word = key.toString().toLowerCase();
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word), value);
   }
}

//WordCountReducer - is doing nothing special just writing the key in the context
 public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
 sum += values.next().get(); }
 output.collect(key, new IntWritable(sum));
    }
}

//LastMapper - will spilt the record sent from reducer and write into the final output file
 public static class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, Text> {
// Now we have to to match filter words to the wordnet dictionary and find out the synsets.
    public void map(Text key, IntWritable value,OutputCollector<Text, Text> output,Reporter reporter) throws IOException {
     
 String word = key.toString();
StringBuffer sbr = new StringBuffer();
sbr.append(key.toString() + "\t" + value.toString());
//System.setProperty("wordnet.database.dir","/home/hadoop/WordnetDictionary/dict");
String matched = Dictionary.match(word);
output.collect(new Text(sbr.toString()), new Text(matched));
    }
}
 @Override
public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
    //conf.setJobName("wordcount");

    //Setting the input and output path
    FileInputFormat.setInputPaths(conf, new Path(args[0]));

    Path outputPath = new Path(args[1]);

    FileOutputFormat.setOutputPath(conf, outputPath);
    //Considering the input and output as text file set the input & output format to TextInputFormat
    conf.setInputFormat(XmlInputFormat.class);
    conf.set("xmlinput.start", "<TEXT>");
 conf.set("xmlinput.end", "</TEXT>");
    conf.setOutputFormat(TextOutputFormat.class);
    conf.set("mapred.textoutputformat.separator", "\n");
    JobConf mapAConf = new JobConf(false);

    ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);      

        //addMapper will take global conf object and mapper class ,input and output type for this mapper and output key/value have to be sent by value or by reference and localJObconf specific to this call

    JobConf mapBConf = new JobConf(false);
    ChainMapper.addMapper(conf, LowerCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);

    JobConf reduceConf = new JobConf(false);
    reduceConf.setCombinerClass(WordCountReducer.class);
    ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

   JobConf mapCConf = new JobConf(false);
   ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, Text.class, true, mapCConf);
    JobClient.runJob(conf);
    return 0;
}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
    System.exit(res);
}}
--------------SGMParser.java-------------------------

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.regex.Pattern;
import java.util.regex.Matcher;

public class SgmParser {
 public static String parse(String line) throws IOException
 {
   final String LINE_SEPARATOR = System.getProperty("line.separator");
   
   InputStream fstream =SgmParser.class.getResourceAsStream("stopwords.txt");
  
  //InputStream fstream = new FileInputStream("stopwords.txt");
   
   BufferedReader sfbr2 =null ;
   
   String token =null;
      // private static Pattern EXTRACTION_PATTERN = Pattern.compile("<TITLE>(.*?)</TITLE>|<DATE>(.*?)</DATE>|<BODY>(.*?)</BODY>");
       Pattern EXTRACTION_PATTERN = Pattern.compile("<BODY>(.*?)</BODY>");
   
        String[] META_CHARS = {"&", "<", ">", "\"", "'", "\""};
   
        String[] META_CHARS_SERIALIZATIONS  = {"&amp;", "&lt;", "&gt;", "&quot;", "&apos;"};
        
        
           int index = -1;
      //     int docNumber = 0;
           StringBuffer buffer = new StringBuffer();
           StringBuffer buffer1 = new StringBuffer();
           String parse = "";
        String lt ="<";
        String gt =">";
           
           for (int id = line.indexOf(lt); index >= 0; id = line.indexOf(lt,id + 1)) {
     int ct = line.indexOf(gt,index + 1);
     int ot = line.indexOf(lt,index + 1);
     if(ot!=-1)
      buffer1.append(line.substring(ct+1,ot)).append(" ");
    }
       if(buffer1.length()==0) {
        buffer1.append(line);
       }
      parse = buffer1.toString().toLowerCase();
      parse=parse.replaceAll("[^a-zA-Z]", " ");
      parse = parse.replaceAll("\\s+", " ").trim();       
                      
      if ((index = parse.indexOf("</REUTERS")) == -1) {
        //Replace the SGM escape sequences
        buffer.append(parse).append(' ');//accumulate the strings for now, then apply regular expression to get the pieces,
       } else {
        //Extract the relevant pieces and write to a file in the output dir
        Matcher matcher = EXTRACTION_PATTERN.matcher(parse);
        while (matcher.find()) {
           for (int i = 1; i <= matcher.groupCount(); i++) {
              if (matcher.group(i) != null) {
                  buffer.append(matcher.group(i));
              }
              buffer.append(LINE_SEPARATOR).append(LINE_SEPARATOR);
        }}}
        String out = buffer.toString();
        for (int i = 0; i < META_CHARS_SERIALIZATIONS.length; i++) {
          out = out.replaceAll(META_CHARS_SERIALIZATIONS[i], META_CHARS[i]);
        }
        
        sfbr2 = new BufferedReader(new InputStreamReader(fstream, "UTF-8"));
        while ((token = sfbr2.readLine()) != null) {
           out=out.replaceAll("\\b"+token.trim()+"\\b", "");
        }
       return out;
 }}

----------------------------Dictionary.java--------------------------------

import edu.smu.tspell.wordnet.Synset;
import edu.smu.tspell.wordnet.WordNetDatabase;

public class Dictionary {

 public static String match(String searchword)
 {
  // String wordForm = buffer.toString();
  // Get the synsets containing the wrod form
  WordNetDatabase database = WordNetDatabase.getFileInstance();
  Synset[] synsets = database.getSynsets(searchword);
  
  StringBuffer sbfr = new StringBuffer();
  // System.setProperty("wordnet.database.dir", "/home/hadoop/WordnetDictionary/dict");
  // Display the word forms and definitions for synsets retrieved
  if (synsets.length > 0)
  {
   for (int i = 0; i < synsets.length; i++)
   {
    
    String[] wordForms = synsets[i].getWordForms();
    for (int j = 0; j < wordForms.length; j++)
    {
     sbfr.append((j > 0 ? ", " : "") +
     wordForms[j]);
    }    }
   
    sbfr.append(": " + synsets[i].getDefinition() + "\n");
   }
  }
  else
  {
   sbfr.append("Not Found");
  }
  return sbfr.toString();
 }
}

XMLInputFormat for old mapred API

XMLInputFormat for old mapred API


Parse the XML files by using XMLInputFormat.
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;

/**
 * Reads records that are delimited by a specifc begin/end tag.
 */
public class XmlInputFormat extends TextInputFormat {

  public static final String START_TAG_KEY = "xmlinput.start";
  public static final String END_TAG_KEY = "xmlinput.end";

  @Override
  public RecordReader<LongWritable,Text> getRecordReader(InputSplit inputSplit,
                                                         JobConf jobConf,
                                                         Reporter reporter) throws IOException {
    return new XmlRecordReader((FileSplit) inputSplit, jobConf);
  }

  /**
   * XMLRecordReader class to read through a given xml document to output xml
   * blocks as records as specified by the start tag and end tag
   *
   */
  public static class XmlRecordReader implements
      RecordReader<LongWritable,Text> {
    private final byte[] startTag;
    private final byte[] endTag;
    private final long start;
    private final long end;
    private final FSDataInputStream fsin;
    private final DataOutputBuffer buffer = new DataOutputBuffer();

    public XmlRecordReader(FileSplit split, JobConf jobConf) throws IOException {
      startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8");
      endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8");

      // open the file and seek to the start of the split
      start = split.getStart();
      end = start + split.getLength();
      Path file = split.getPath();
      FileSystem fs = file.getFileSystem(jobConf);
      fsin = fs.open(split.getPath());
      fsin.seek(start);
    }

    @Override
    public boolean next(LongWritable key, Text value) throws IOException {
      if (fsin.getPos() < end) {
        if (readUntilMatch(startTag, false)) {
          try {
            buffer.write(startTag);
            if (readUntilMatch(endTag, true)) {
              key.set(fsin.getPos());
              value.set(buffer.getData(), 0, buffer.getLength());
              return true;
            }
          } finally {
            buffer.reset();
          }
        }
      }
      return false;
    }

    @Override
    public LongWritable createKey() {
      return new LongWritable();
    }

    @Override
    public Text createValue() {
      return new Text();
    }

    @Override
    public long getPos() throws IOException {
      return fsin.getPos();
    }

    @Override
    public void close() throws IOException {
      fsin.close();
    }

    @Override
    public float getProgress() throws IOException {
      return (fsin.getPos() - start) / (float) (end - start);
    }

    private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException    {
      int i = 0;
      while (true) {
        int b = fsin.read();
        // end of file:
        if (b == -1) return false;
        // save to buffer:
        if (withinBlock) buffer.write(b);

        // check if we're matching:
        if (b == match[i]) {
          i++;
          if (i >= match.length) return true;
        } else i = 0;
        // see if we've passed the stop point:
        if (!withinBlock && i == 0 && fsin.getPos() >= end) return false;
      }
    }
  }
}
Related Posts Plugin for WordPress, Blogger...