Tuesday, 21 October 2014

Hadoop in Practice





Hadoop in Practice
By Alex Holmes

Working with simple data formats such as log files is straightforward and supported in MapReduce.  In this article based on Chapter 3 of Hadoop in Practice, author Alex Holmes shows you how to work with ubiquitous data serialization formats such as XML and JSON.  


Processing Common Serialization Formats

XML and JSON are industry-standard data interchange formats. Their ubiquity in our industry is evidenced in their heavy adoption in data storage and exchange. XML has existed since 1998 as a mechanism to represent data that is readable by machines and humans alike. It became a universal language to data exchange between systems and is employed by many standards today such as SOAP and RSS and used as an open data format for products such as Microsoft Office.

Technique 1: MapReduce and XML

Our goal is to be able to use XML as a data source for a MapReduce job. We’re going to assume that the XML documents that need to be processed are large and, as a result, we want to be able to process them in parallel with multiple mappers working on the same input file.

Problem

Working on a single XML file in parallel in MapReduce is tricky because XML does not contain a synchronization marker in its data format. Therefore, how do we work with a file format that’s not inherently splittable like XML?

Solution

MapReduce doesn’t contain built-in support for XML, so we have to turn to another Apache project, Mahout, a machine learning system, which provides an XML InputFormat. To showcase the XML InputFormat, let’s write a MapReduce job that uses the Mahout’s XML Input Format to read property names and values from Hadoop’s
configuration files. Our first step is to set up our job configuration.
1.conf.set("xmlinput.start", "<property>");            #1
2.conf.set("xmlinput.end", "</property>");             #2
3.job.setInputFormatClass(XmlInputFormat.class);       #3
#1 Defines the string form of the XML start tag. Our job is to take Hadoop config files as input, where each configuration entry uses the "property" tag.
#2 Defines the string form of the XML end tag.
#3 Sets the Mahout XML input format class. 
It quickly becomes apparent by looking at the code that Mahout’s XML InputFormat is rudimentary; you need to tell
it an exact sequence of start and end XML tags that will be searched in the file. Looking at the source of the
InputFormat confirms this:
01.private boolean next(LongWritable key, Text value)
02.throws IOException {
03.if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
04.try {
05.buffer.write(startTag);
06.if (readUntilMatch(endTag, true)) {
07.key.set(fsin.getPos());
08.value.set(buffer.getData(), 0, buffer.getLength());
09.return true;
10.}
11.finally {
12.buffer.reset();
13.}
14.}
15.return false;
16.}

Next, we need to write a Mapper to consume Mahout’s XML input format. We’re being supplied the XML element in
Text form, so we’ll need to use an XML parser to extract content from the XML.
01.public static class Map extends Mapper<LongWritable, Text,
02.Text, Text> {
03.@Override
04.protected void map(LongWritable key, Text value,
05.Mapper.Context context)
06.throws
07.IOException, InterruptedException {
08.String document = value.toString();
09.System.out.println("‘" + document + "‘");
10.try {
11.XMLStreamReader reader =
12.XMLInputFactory.newInstance().createXMLStreamReader(new
13.ByteArrayInputStream(document.getBytes()));
14.String propertyName = "";
15.String propertyValue = "";
16.String currentElement = "";
17.while (reader.hasNext()) {
18.int code = reader.next();
19.switch (code) {
20.case START_ELEMENT:
21.currentElement = reader.getLocalName();
22.break;
23.case CHARACTERS:
24.if (currentElement.equalsIgnoreCase("name")) {
25.propertyName += reader.getText();
26.else if (currentElement.equalsIgnoreCase("value")) {
27.propertyValue += reader.getText();
28.}
29.break;
30.}
31.}
32.reader.close();
33.context.write(propertyName.trim(), propertyValue.trim());
34.catch (Exception e) {
35.log.error("Error processing ‘" + document + "‘", e);
36.}
37.}
38.}

Our Map is given a Text instance, which contains a String representation of the data between the start and end tags. In our code we’re using Java’s built-in Streaming API for XML (StAX) parser to extract the key and value for each property and output them. If we run our MapReduce job against Cloudera’s core-site.xml and cat the output, we’ll see the output that you see below.
01.$ hadoop fs -put $HADOOP_HOME/conf/core-site.xml core-site.xml
02. 
03.$ bin/run.sh com.manning.hip.ch3.xml.HadoopPropertyXMLMapReduce \
04.core-site.xml output
05. 
06.$ hadoop fs -cat output/part*
07.fs.default.name hdfs://localhost:8020
08.hadoop.tmp.dir /var/lib/hadoop-0.20/cache/${user.name}
09.hadoop.proxyuser.oozie.hosts *
10.hadoop.proxyuser.oozie.groups *

This output shows that we have successfully worked with XML as an input serialization format with MapReduce! Not only that, we can support huge XML files since the InputFormat supports splitting XML.

WRITING XML

Having successfully read XML, the next question would be how do we write XML? In our Reducer, we have callbacks
that occur before and after our main reduce method is called, which we can use to emit a start and end tag.
01.public static class Reduce
02.extends Reducer<Text, Text, Text, Text> {
03. 
04.@Override
05.protected void setup(
06.Context context)
07.throws IOException, InterruptedException {
08.context.write(new Text("<configuration>"), null);            #1
09.}
10. 
11.@Override
12.protected void cleanup(
13.Context context)
14.throws IOException, InterruptedException {
15.context.write(new Text("</configuration>"), null);           #2
16.}
17. 
18.private Text outputKey = new Text();
19.public void reduce(Text key, Iterable<Text> values,
20.Context context)
21.throws IOException, InterruptedException {
22.for (Text value : values) {
23.outputKey.set(constructPropertyXml(key, value));           #3
24.context.write(outputKey, null);                            #4
25.}
26.}
27. 
28.public static String constructPropertyXml(Text name, Text value) {
29.StringBuilder sb = new StringBuilder();
30.sb.append("<property><name>").append(name)
31..append("</name><value>").append(value)    
32..append("</value></property>");
33.return sb.toString();
34.}
35.}
#1 Uses the setup method to write the root element start tag.
#2 Uses the cleanup method to write the root element end tag.
#3 Constructs a child XML element for each key/value combination we get in the Reducer. #4 Emits the XML element.
This could also be embedded in an OutputFormat.

PIG

If you want to work with XML in Pig, the Piggybank library (a user-contributed library of useful Pig code) contains an XMLLoader. It works in a similar way to our technique and captures all of the content between a start and end tag and supplies it as a single bytearray field in a Pig tuple.

HIVE

Currently, there doesn’t seem to be a way to work with XML in Hive. You would have to write a custom SerDe[1].

Discussion

Mahout’s XML InputFormat certainly helps you work with XML. However, it’s very sensitive to an exact string match of both the start and end element names. If the element tag can contain attributes with variable values, or the generation of the element can’t be controlled and could result in XML namespace qualifiers being used, then this approach may not work for you. Also problematic will be situations where the element name you specify is used as a descendant child element.
If you have control over how the XML is laid out in the input, this exercise can be simplified by having a single XML element per line. This will let you use the built-in MapReduce text-based InputFormats (such as TextInputFormat), which treat each line as a record and split accordingly to preserve that demarcation.
Another option worth considering is that of a preprocessing step, where you could convert the original XML into a separate line per XML elemen, or convert it into an altogether different data format such as a SequenceFile or Avro, both of which solve the splitting problem for you.
There’s a streaming class StreamXmlRecordReader to allow you to work with XML in your streaming code.
We have a handle on how to work with XML, so let’s move on to tackle another popular serialization format, JSON. JSON shares the machine and human-readable traits of XML and has existed since the early 2000s. It is less verbose than XML and doesn’t have the rich typing and validation features available in XML.

Technique 2: MapReduce and JSON

Our technique covers how you can work with JSON in MapReduce. We’ll also cover a method by which a JSON file can be partitioned for concurrent reads.

Problem

Figure 1 shows us the problem with using JSON in MapReduce. If you are working with large JSON files, you need to be able to split them. But, given a random offset in a file, how do we determine the start of the next JSON element, especially when working with JSON that has multiple hierarchies such as in the example below?



Figure 1 Example of issue with JSON and multiple input splits

Solution

JSON is harder to partition into distinct segments than a format such as XML because JSON doesn’t have a token (like an end tag in XML) to denote the start or end of a record.
ElephantBird[2], an open-source project that contains some useful utilities for working with LZO compression, has a LzoJsonInputFormat, which can read JSON, but it requires that the input file be LZOP compressed. We’ll use this code as a template for our own JSON InputFormat, which doesn’t have the LZOP compression requirement.

We’re cheating with our solution because we’re assuming that each JSON record is on a separate line. Our JsonRecordFormat is simple and does nothing other than construct and return a JsonRecordReader, so we’ll skip over that code. The JsonRecordReader emits LongWritable, MapWritable key/value pairs to the Mapper, where the Map is a map of JSON element names and their values. Let’s take a look at how this RecordReader works. It leverages the LineRecordReader, which is a built-in MapReduce reader that emits a record for each line. To convert the line to a MapWritable, it uses the following method.
01.public static boolean decodeLineToJson(JSONParser parser, Text line,
02.MapWritable value) {
03.try {
04.JSONObject jsonObj = (JSONObject)parser.parse(line.toString());
05.for (Object key: jsonObj.keySet()) {
06.Text mapKey = new Text(key.toString());
07.Text mapValue = new Text();
08.if (jsonObj.get(key) != null) {
09.mapValue.set(jsonObj.get(key).toString());
10.}
11. 
12.value.put(mapKey, mapValue);
13.}
14.return true;
15.catch (ParseException e) {
16.LOG.warn("Could not json-decode string: " + line, e);
17.return false;
18.catch (NumberFormatException e) {
19.LOG.warn("Could not parse field into number: " + line, e);
20.return false;
21.}
22.}

It uses the json-simple[3] parser to parse the line into a JSON object and then iterates over the keys and puts the keys and values into a MapWritable. The Mapper is given the JSON data in LongWritable, MapWriable pairs and can process the data accordingly. The code for the MapReduce job is very basic. We’re going to demonstrate the code using the JSON below.
01.{
02."results" :
03.[
04.{
05."created_at" "Thu, 29 Dec 2011 21:46:01 +0000",
06."from_user" "grep_alex",
07."text" "RT @kevinweil: After a lot of hard work by ..."
08.},
09.{
10."created_at" "Mon, 26 Dec 2011 21:18:37 +0000",
11."from_user" "grep_alex",
12."text" "@miguno pull request has been merged, thanks again!"
13.}
14.]
15.}

Since our technique assumes a JSON object per line, the actual JSON file we’ll work with is shown below.
1.{"created_at" "Thu, 29 Dec 2011 21:46:01 +0000","from_user" : ...
2.{"created_at" "Mon, 26 Dec 2011 21:18:37 +0000","from_user" : ...

We’ll copy the JSON file into HDFS and run our MapReduce code. Our MapReduce code simply writes each JSON
key/value to the output.
01.$ hadoop fs -put test-data/ch3/singleline-tweets.json \
02.singleline-tweets.json
03. 
04.$ bin/run.sh com.manning.hip.ch3.json.JsonMapReduce \
05.singleline-tweets.json output
06. 
07.$ fs -cat output/part*
08.text RT @kevinweil: After a lot of hard work by ...
09.from_user grep_alex
10.created_at Thu, 29 Dec 2011 21:46:01 +0000
11.text @miguno pull request has been merged, thanks again!
12.from_user grep_alex
13.created_at Mon, 26 Dec 2011 21:18:37 +0000

WRITING JSON

An approach similar to what we looked at for writing XML could also be used to write JSON.

PIG

ElephantBird contains a JsonLoader and LzoJsonLoader, which can be used to work with JSON in Pig. It also works for JSON that is line based. Each Pig tuple contains a field for each JSON element in the line as a chararray.

HIVE

Hive contains a DelimitedJSONSerDe, which can serialize JSON but unfortunately not deserialize it, so you can’t load data into Hive using this SerDe.

Discussion

Our solution works with the assumption that the JSON input is structured with a line per JSON object. How would we work with JSON objects that are across multiple lines? The authors have an experimental project on GitHub[4], which works with multiple input splits over a single JSON file. The key to this approach is searching for a specific JSON member and retrieving the containing object. There’s a Google Code project called hive-json-serde[5], which can support both serialization and deserialization.

Summary

As you can see, using XML and JSON in MapReduce is kludgy and has rigid requirements about how your data is laid out. Supporting them in MapReduce is complex and error prone, as they don’t naturally lend themselves to splitting. Alternative file formats, such as Avro and SequenceFiles, have built-in support for splittability.


If you would like to purchase Hadoop in Practice, DZone members can receive a 38% discount by entering the Promotional Code: dzone38 during checkout at Manning.com.



[1] SerDe is a shortened form of Serializer/Deserializer, the mechanism that allows Hive to read and write data in HDFS.
[2] https://github.com/kevinweil/elephant-bird
[3] http://code.google.com/p/json-simple/
[4] A multiline JSON InputFormat. https://github.com/alexholmes/json-mapreduce.
[5] http://code.google.com/p/hive-json-serde/


Source: http://www.manning.com/holmes/

Scheduling Workflows Using Oozie Coordinator

Introduction 

In Hadoop Ecosystem, most of the functionality like map-reduce jobs, pig scripts and hive queries are executed as batch jobs. This creates a lot of overhead in deployment and maintenance of hadoop components. As a solution to this, Oozie provides workflows in xml format using which we can define multiple Map/Reduce jobs into a logical unit of work, accomplishing the larger task [4]. This helps in chaining the related MapReduce jobs which can be either Hive queries or Pig scripts like mentioned in below diagram.
Workflows work perfectly when invoked on demand or manually. But for achieving higher level of automation and effectiveness, it becomes necessary to run them based on one or more of the following parameters: regular time intervals, data availability or external events. Then, we need more functionality than provided by Oozie workflows.
In this paper, Oozie Coordinator Jobs will be discussed which provide options to embed workflows and trigger them on regular time-intervals or on basis of data availability.

The Oozie coordinator allows expressing conditions to trigger execution of workflow in the form of the predicates [1]. These predicates are conditional statements on parameters like time, data and external events. If the predicate is satisfied, then only the workflow job/action is started.

Oozie Coordinator System

As stated at Oozie documentation page [1], “Oozie is a Java Web-Application that runs in a Java servlet-container”. It uses XML for taking configuration inputs from user and uses a database (default is derby but MySQL, HSQLDB or any RDBMS database can also be used) to store:

  • Definitions of Workflow and Coordinator
  • Currently running workflow and Coordinator instances, including instance states, configuration variables and parameters.
Oozie Coordinator is a collection of predicates (conditional statements based on time-frequency and data availability) and actions (i.e. Hadoop Map/Reduce jobs, Hadoop file system, Hadoop Streaming, Pig, Java and Oozie sub-workflow). Actions are recurrent workflow jobs invoked each time predicate returns true.

Oozie version 2 and higher supports Coordinator Jobs. Coordinator Job is defined in the XML Process Definition Language.

Predicates are conditional statements, defined using attributes “interval, start-time and end-time” for time-based triggering and xml-tags “dataset and input-events” for data-availability based triggering of workflows.
Actions are the mechanism by which a workflow is triggered for the execution of a computation/processing task. Action contains description of one or more workflows to be executed.

Oozie is lightweight as it uses existing Hadoop Map/Reduce framework for executing all tasks in a workflow. This approach allows it to leverage existing Hadoop installation for providing scalability, reliability, parallelism, etc.
On the basis of functionality, Coordinator can be sub-divided into two major groups [2]:
1. Time-Based Coordinator: This type of Coordinator definition is used for invoking the workflow repeatedly after an interval between a specified period of time.
2.File-Based Coordinator: This type of Coordinator definition is used for invoking the workflow on the basis of data availability and data polling.
2.1  Simple File-Based Coordinator: The action is invoked whenever data available predicate is true.
2.2 Sliding Window-Based Coordinator:  It is invoked frequently and data is aggregated over multiple overlapping previous instances. For example, invoking it at a frequency of 5 minutes and running action on aggregated previous 4 instances of 15 minutes data.
2.3Rollups-Based Coordinator: It is invoked after a long period of time and data is aggregated over multiple previous instances from last time of invocation. For example, it will run once a day, and will trigger a workflow that aggregates 24 instances of hourly data.

Oozie Coordinator Components and Variables

  • Coordinator-App: It is a wrapper component that defines the attributes of a coordinator and includes all other components.
Attributes are:
  • start , end :  describes the start and end time in yyyy-mm-ddThh:mmZ format 
  • Time zone: describes the time zone (is the value of Z in the above time format) like UTC. 
  • Controls: It contains parameters like timeout, concurrency, etc. to configure the execution of coordinator job.
  • Datasets: It contains the definition of multiple data sources and frequency of data polling.
Attributes are:
  • Frequency: interval of time at which data polling is done.
  • Initial-Instance: start time of data polling in yyyy-mm-ddThh:mmZ format.
  • Uri-Template: URI of the data source. Expression language can be used. For example, ${YEAR} corresponds to current year. It helps in dynamic selection of data source directories.
  • Done-flag: This flag denotes the success of data polling. It can be a file in which case the presence of file is checked before calling action. It can be left empty otherwise for implicit success message.
  • Input-Events:  denotes the processing of the input data before running the action.
  • Data-in: it denotes the aggregated output data of input-event.
  • Start-instance and end-instance: boundary of data instances that needs to be aggregated.
  • Output-Events:  denotes the processing of the output data after running the action.
  • Data-out: it denotes the output dataset.
  • Instance:  instance of dataset that is to be used as sink for output.
  • Action: It includes the path of the workflow that has to be invoked when predicate return true.
It could also be configured to record the events required to evaluate SLA compliance.

Oozie Coordinator Lifecycle Operations


The lifecycle operations of coordinator are similar to those of oozie workflow except start operation. “Start” is not applicable for coordinators.
  • Submit/Run: Both operations submit the coordinator job to oozie. The job will be in PREP state till the mentioned start-time of the coordinator. 
  • Suspend: Suspends/pause the coordinator job. 
  • Resume: Resumes the execution of the coordinator job. 
  • Kill: kill the coordinator job and ends its execution. 
  • reRun: re-submitting the coordinator job/actions with new parameters. 

Oozie Coordinator Example   

In this section, we will see how to use oozie coordinator for scheduling and triggering of the workflows.

  • A Sample Workflow: First of all, we need a oozie workflow job. For example purpose, I have taken the simple wordcount example provided by Apache-Hadoop-Distribution in hadoop-examples-0.20.2-cdh3u0.jar [6].
The workflow for wordcount is:
<workflow-app xmlns='uri:oozie:workflow:0.1' name='java-main-wf'>
<start to='mapreduce-wordcount-example' />
<action name='mapreduce-wordcount-example'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>org.apache.hadoop.examples.ExampleDriver</main-class>
<arg>wordcount</arg>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>
Once workflow is created it has to be deployed correctly. A typical Oozie deployment is a HDFS directory, containing workflow.xml and a lib subdirectory, containing jar files of classes used by workflow actions.
For example, the directory structure in hadoop will be as shown below. (If user.name is training)
[training@localhost ~]$ hadoop dfs -ls /user/training/oozie/workflow/wordcount
Found 2 items
drwxr-xr-x   - training supergroup          0 2012-09-18 12:05 /user/training/oozie/workflow/wordcount/lib
-rw-r--r--   1 training supergroup        918 2012-09-18 11:47 /user/training/oozie/workflow/wordcount/workflow.xml
The job.properties file will have following properties:
nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
inputDir=${nameNode}/data.in
outputDir=${nameNode}/out
user.name=training
oozie.wf.application.path=${nameNode}/user/${user.name}/oozie/workflow/wordcount/
With job properties in place, this workflow can be invoked manually using the oozie workflows submit command from command-line.
[training@localhost Desktop]$ oozie job -oozie=http://localhost:11000/oozie/ -config oozie/wordcount-demo/workflow/job.properties -run;
job: 0000000-120918134457517-oozie-oozi-W
2. Oozie Coordinator Definition: As discussed above, coordinator-definitions will be different for different kind of triggering and scheduling.
So, we will take each kind of Coordinator one by one and schedule wordcount example on the basis of that.
Moreover, Oozie coordinators can be parameterized using variables like ${inputDir}, ${startTime}, etc. within the coordinator definition. When submitting a coordinator job, values for the parameters must be provided as input. As parameters are key-value pairs, they can be written in a job.properties file or a XML file. Parameters can also be provided in form of a java Map object if using JAVA API to invoke a coordinator job.
  • Time-Based Coordinator
The generic definition for this kind of coordinator is
<coordinator-app name="coordinator1" frequency="${frequency}" start="${startTime}" end="${endTime}" timezone="${timezone}" xmlns="uri:oozie:coordinator:0.1">
<action>
<workflow>
<app-path>${workflowPath}</app-path>
</workflow>
</action>          
</coordinator-app>
Save the file as coordinator.xml in a HDFS directory. (Please note that coordinator.xml is the only name which can be given to the file as oozie uses this default name for reading file in HDFS directory.)
The coordinatorjob.properties can be defined as
frequency=60
startTime=2012-08-31T20\:20Z
endTime=2013-08-31T20\:20Z
timezone=GMT+0530
workflowPath=${nameNode}/user/${user.name}/oozie/workflow/wordcount/
nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
inputDir=${nameNode}/data.in
outputDir=${nameNode}/out
oozie.coord.application.path=${nameNode}/user/${user.name}/coordOozie/coordinatorTimrBased
The coordinator application path must be specified in the file with the oozie.coord.application.path property. Specified path must be an HDFS path.
  • File-Based Coordinator
<coordinator-app name="coordinator1" frequency="${frequency}" start="${startTime}" end="${endTime}" timezone="UTC" xmlns="uri:oozie:coordinator:0.1">
<datasets>
<dataset name="input1" frequency="${datasetfrequency}" initial-instance="${datasetinitialinstance}"
timezone="${datasettimezone}">
<uri-template>${dataseturitemplate}/${YEAR}/${MONTH}/${DAY}/${HOUR}/
${MINUTE}</uri-template>
<done-flag> </done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="coordInput1" dataset="input1">
<start-instance>${inputeventstartinstance}</start-instance>
<end-instance>${inputeventendinstance}</end-instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${workflowPath}</app-path>
</workflow>
</action>    
</coordinator-app>
Save the file as coordinator.xml in a HDFS directory. (Please note that coordinator.xml is the only name which can be given to the file as oozie uses this default name for reading file in HDFS directory.)
The coordinatorjob.properties can be defined as
frequency=60
startTime=2012-08-21T15:25Z
endTime=2012-08-22T15:25Z
timezone=UTC
datasetfrequency=15
datasetinitialinstance=2012-08-21T15:30Z
datasettimezone=UTC
dataseturitemplate=${namenode}/user/hadoop/oozie/coordinator/in
inputeventstartinstance=${coord:current(0)}
inputeventendinstance=${coord:current(0)}
workflowPath=${nameNode}/user/${user.name}/oozie/workflow/wordcount/
nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
inputDir= ${coord:dataIn('coordInput1')}
outputDir=${nameNode}/out
oozie.coord.application.path=${nameNode}/user/${user.name}/coordOozie/coordinatorFileBased
The coordinator application path must be specified in the file with the oozie.coord.application.path property. Specified path must be an HDFS path.
  • Sliding-Window Based Coordinator
This is a specific usecase for the File-Based Coordinator where coordinator is invoked frequently and data is aggregated over multiple overlapping previous instances.
The rule for this can be generalized as
Coordinator-frequency < DataSet-Frequency
For example, the coordinator job.properties will be like
frequency=5

datasetfrequency=15
……
  • Rollups Based Coordinator
This is a specific usecase for the File-Based Coordinator where coordinator is invoked after a long period of time and data is aggregated over multiple previous instances from last time of invocation. 

The rule for this can be generalized as
Coordinator-frequency > DataSet-Frequency
frequency=1440
….
datasetfrequency=60
…….

Running Coordinator Example from Command line

  • Submitting/Running the coordinator job
$ oozie job -oozie http://localhost:11000/oozie -config coordinatorjob.properties [-submit][-run]
job: 0000672-120823182447665-oozie-hado-C
The parameters for the job must be provided in a file, either a Java Properties file (.properties) or a Hadoop XML Configuration file (.xml). This file must be specified with the -config option.
  • Suspending the coordinator job
$ oozie job -oozie http://localhost:11000/oozie -suspend 0000673-120823182447665-oozie-hado-C
  • Resuming a Coordinator Job
$ oozie job -oozie http://localhost:11000/oozie -resume 0000673-120823182447665-oozie-hado-C
  • Killing a Coordinator Job
$ oozie job -oozie http://localhost:11000/oozie -kill 0000673-120823182447665-oozie-hado-C
  • Rerunning a Coordinator Action or Multiple Actions
$ oozie job -rerun 0000673-120823182447665-oozie-hado-C [-nocleanup] 
[-refresh][-action 1,3-5] [-date 2012-01-01T01:00Z::2012-05-31T23:59Z, 2012-11-10T01:00Z, 2012-12-31T22:00Z] 
-action or -date is required to rerun. If neither -action nor -date is given, the exception will be thrown.
  • Checking the Status of a Coordinator/Workflow job or a Coordinator Action
oozie job -oozie http://localhost:11000/oozie -info 0000673-20823182447665-oozie-hado-C
The info option can display information about a workflow job or coordinator job or coordinator action.

Invoking Coordinator Jobs from Java Client

The Oozie has exposed a JAVA API for invoking and controlling the workflows programmatically. Same API is also made applicable for coordinator but with some changes as coordinator and workflow differ in functioning.

 //The service for executing coordinators on oozie
   public class CoordinatorOozieService
{        
// Oozie Client
OozieClient oozieClient = null
                public CoordinatorOozieService(String url){      
oozieClient = new OozieClient(url);        
}              
                //To submit the coordinator job on oozie
public String submitJob(String jobPropertyFilePath) throws OozieClientException, IOException{                       
                                // create an empty coordinator job configuration object
//with just the USER_NAME set to the JVM user name
                                Properties conf = oozieClient.createConfiguration();
                                conf.setProperty("user.name", "training");

//set the coordinator properties
conf.load(new FileInputStream(jobPropertyFilePath));
                                // submit the coordinator job    
return oozieClient.submit(conf);


//To submit the coordinator job on oozie
public String submitJob(Properties workflowProperties) throws OozieClientException, IOException{                       
                                // create an empty coordinator job configuration object
//with just the USER_NAME set to the JVM user name
Properties conf = oozieClient.createConfiguration();
                                //set the coordinator properties
conf.putAll(workflowProperties);

conf.setProperty("user.name", "training");


// submit the coordinator job    
return oozieClient.submit(conf);
}

// To run (submit and start) the coordinator job on oozie
public String runJob(String jobPropertyFilePath) throws OozieClientException, IOException{                       
                                // create an empty coordinator job configuration object
//with just the USER_NAME set to the JVM user name
Properties conf = oozieClient.createConfiguration();
                                conf.setProperty("user.name", "training");

//set the coordinator properties
conf.load(new FileInputStream(jobPropertyFilePath));
                                // submit and start the coordinator job    
return oozieClient.run(conf);


// To suspend the coordinator job on oozie
public void suspendJob(String jobId) throws OozieClientException {                       
                                // start the coordinator job    
oozieClient.suspend(jobId);
}

// To resume the coordinator job on oozie
                public void resumeJob(String jobId) throws OozieClientException {                       
                                // start the coordinator job    
oozieClient.resume(jobId);
}

//To kill the coordinator job on oozie
*/
public void killJob(String jobId) throws OozieClientException {                       
                                // start the coordinator job    
oozieClient.kill(jobId);
}

//To get the status of the Coordinator Job with id <jobID>
public Status getJobStatus(String jobID) throws OozieClientException{ 
CoordinatorJob job = oozieClient.getCoordJobInfo(jobID);                
return job.getStatus();
}              
}

Conclusion

The Oozie Coordinator can be used for efficient scheduling of the Hadoop-related workflows. It also helps in triggering the same on the basis of availability of the data or external events. Moreover, it provides lot of configurable and pluggable components which helps in easy and effective deployment and maintenance of the Oozie workflow jobs.
As the coordinator is specified in XML, it is easy to integrate it with the J2EE applications. Invoking of coordinator jobs through java has already been explained above.

Enhancements

Oozie provides a new component, “Bundle” in its latest version 3. It provides a higher-level abstraction in which it creates a set of coordinator applications often called a Data Pipeline. Data Dependency can be inserted between multiple coordinator jobs to create an implicit data application pipeline. Oozie Lifecycle operations (start/stop/suspend/resume/rerun) can also be applied at the bundle level which  results in a better and easy operational control.

References

[1] Oozie Yahoo! Workflow Engine for Hadoop: http://incubator.apache.org/oozie/docs/3.1.3-incubating/docs/
[2] Oozie Coord Use Cases: https://github.com/yahoo/oozie/wiki/Oozie-Coord-Use-Cases
[3] Better Workflow Management in CDH Using Oozie 2: https://github.com/yahoo/oozie/wiki/Oozie-Coord-Use-Cases
[5] Apache Hadoop: http://hadoop.apache.org/
[6] Index of public/org/apache/hadoop/hadoop-examples/0.20.2-cdh3u0:
https://repository.cloudera.com/artifactory/public/org/apache/hadoop/hadoop-examples/0.20.2-cdh3u0/
Related Posts Plugin for WordPress, Blogger...