Tuesday, 21 October 2014

Hadoop Tutorials: Ingesting XML in Hive using XPath

In the first of my series of Hadoop tutorials, I wanted to share an interesting case that arose when I was experiencing poor performance trying to do queries and computations on a set of XML Data. These computations could be mathematical as well as statistical for which the data needed to be ingested into a platform that could handle huge amounts of data and could be easily queried. The current tool used for processing this data was too expensive and slow because of which we needed to come up with a less expensive solution that was more cost effective.

Since we are already into the Hadoopworld we decided to use either Hive or Pig. This would be cost effective as well as yield good performance since it would benefit from Hadoop’s distributed storage and processing. The end users were more comfortable with SQL so we decided to go with Hive. XML can be ingested directly into Hive using XPath but the problem arises when you have a few hundred fields for which you need to generate XPath tags. Even though XPath is an excellent way to read from a XML file the user still has to manually specify every tag that is to be read.

The solution was to have a piece of code that would go through a part of the XML file containing few records and spit out XPath tags. Each XML tag can have multiple tags within it for we had to loop into tags and maintain counters on the parent as well as child tags. In any XML file there are also some parent tags that do not hold any value but just encapsulate other child tags that hold values and need to be separated.

Here is a sample of the input XML file:

  1. <policy><policyLimit>1000</policyLimit><versionNumber>1</versionNumber><customerNumber>12345</customerNumber><vehicleCoverage  
  2. <coverageLimit>/500</coverageLimit><coverageCode>ABC</coverageCode></vehicleCoverage></policy>  

Here we have policy as the parent tag within which we have policyLimit .. vehicleCoverage. vehicleCoverage is a child to policy but parent to other tags such as coverageLimit .. coverageCode. We need to make sure that none of these parent tags show up in our final XPath list. We also need to specify the base node tag for our input XML which would be policy in this case.

Here is sample java code that iterates through the XML file and handles the following:

  1. HashSet<String> unwantedParentTags = new HashSet<String>();  
  2. Hashtable<String, Integer> nodeCounters= new Hashtable<String, Integer>();  
  3. String currentNode = null;  
  4. Integer currentNodeCounter = 0;  
  5.   
  6. DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();  
  7. org.w3c.dom.Document inFile = db.parse(new File(inputPath));  
  8. BufferedWriter outXPath = new BufferedWriter(new FileWriter(outputXPathTempPath));  
  9.   
  10. //Level 1  
  11. NodeList base = inFile.getElementsByTagName("policy");  
  12. Node baseNode = base.item(0);  
  13. NodeList children = baseNode.getChildNodes();  
  14. for (int i = 0; i < children.getLength(); i++) {  
  15.     Node item = children.item(i);  
  16.     if (item.getNodeType() == Node.ELEMENT_NODE) {  
  17.         if(item.getPreviousSibling() == null)  
  18.         {    
  19.             currentNode = "xpath_string(policyxml, '/" + baseNode.getNodeName() + "/" + item.getNodeName() + "') as " + item.getNodeName().toLowerCase();  
  20.             if(nodeCounters.containsKey(currentNode))  
  21.             {  
  22.                 currentNodeCounter = nodeCounters.get(currentNode);  
  23.                 nodeCounters.remove(currentNode);  
  24.                 nodeCounters.put(currentNode, currentNodeCounter + 1);  
  25.                 outXPath.write("xpath_string(policyxml, '/" + baseNode.getNodeName() + "[" + (currentNodeCounter + 1) + "]" + "/" + item.getNodeName() + "') as " + item.getNodeName().toLowerCase() + (currentNodeCounter + 1));  
  26.                 outXPath.newLine();  
  27.             }  
  28.             else  
  29.             {  
  30.                 nodeCounters.put(currentNode, 1);  
  31.                 outXPath.write("xpath_string(policyxml, '/" + baseNode.getNodeName() + "[1]"  + "/" + item.getNodeName() + "') as " + item.getNodeName().toLowerCase() + "1");  
  32.                 outXPath.newLine();  
  33.             }  
  34.         }  
  35.         else  
  36.         {  
  37.             if(!(item.getPreviousSibling().toString().equalsIgnoreCase("[" + item.getNodeName() + ": null]")))  
  38.             {  
  39.                 currentNode = "xpath_string(policyxml, '/" + baseNode.getNodeName() + "/" + item.getNodeName() + "') as " + item.getNodeName().toLowerCase();  
  40.                 if(nodeCounters.containsKey(currentNode))  
  41.                 {  
  42.                     currentNodeCounter = nodeCounters.get(currentNode); System.out.println(currentNodeCounter+currentNode);  
  43.                     nodeCounters.remove(currentNode);  
  44.                     nodeCounters.put(currentNode, currentNodeCounter + 1);  
  45.                     outXPath.write("xpath_string(policyxml, '/" + baseNode.getNodeName() + "[" + (currentNodeCounter + 1) + "]" + "/" + item.getNodeName() + "') as " + item.getNodeName().toLowerCase() + (currentNodeCounter + 1));  
  46.                     outXPath.newLine();  
  47.                 }  
  48.                 else  
  49.                 {  
  50.                     nodeCounters.put(currentNode, 1);  
  51.                     outXPath.write("xpath_string(policyxml, '/" + baseNode.getNodeName() + "[1]"  + "/" + item.getNodeName() + "') as " + item.getNodeName().toLowerCase() + "1");  
  52.                     outXPath.newLine();  
  53.                 }  
  54.             }  
  55.             else  
  56.             {  
  57.                 if(!unwantedParentTags.contains(baseNode.getNodeName() + "/" + item.getNodeName()))  
  58.                 {  
  59.                     unwantedParentTags.add(baseNode.getNodeName() + "/" + item.getNodeName());  
  60.                     currentNode = "xpath_string(policyxml, '/" + baseNode.getNodeName() + "/" + item.getNodeName() + "') as " + item.getNodeName().toLowerCase();  
  61.                     nodeCounters.remove(currentNode);  
  62.                 }  
  63.             }  
  64.         }  

  1. Counter on parent and child tags
  2. List of unwanted parent tags
  3. Fully qualified XPath’s in the format basenode/childnode

The code shown above only works at the first level / loop while iterating through the XML and can be modified to recursively loop through each level. Once we have a complete list of XPath expressions as well as unwanted parent tags, the next step is to iterate through the entire tag list and remove unwanted parent tags. Now we have our final list of XPath tags the only step left is to write the Hive Script to read from our XML file. We must pre-process the raw XML into a set of Hive friendly newline terminated XML records, cleansing embedded newlines and other formatting. The source XML in our case contains formatting whitespace and newlines for readability. We delete literal ampersands as well as remove all whitespace and newlines, then insert a newline at the end of each record level tag.

  1. cat <Input File Name>.xml | tr -d '&' | tr '\n' ' ' | tr '\r' ' ' | sed 's|</policy>|</policy>\n|g' | grep -v '^\s*$' > <Output File Name>.xmlb  

Once we have the list of XPath’s and a processed XML file` we can write the hive script having the following parts

  1. Create a hive table on top of the XML flat file. Here the table records consist of a single string column that contains the XML text for a single XML record.
  2. Create a view by reading from the above created table incorporating all the needed XPath expressions.
  3. Create a table of persisted data, since views may have performance issues.

Here is a sample of the hive script:

  1. CREATE EXTERNAL TABLE xpath_table (policyxml string)  
  2. STORED AS TEXTFILE  
  3. LOCATION '/hdfs/path/to/flat/processed/file’;  
  4.   
  5. CREATE VIEW xpath_view(policylimit, coveragecode)  
  6. AS SELECT  
  7. xpath_string(policyxml, '/policy[1]/ policyLimit '),  
  8. xpath_string(policyxml, '/policy[1]/vehicleCoverage[1]/coverageCode')  
  9. FROM xpath_table;  
  10.   
  11. CREATE TABLE xpath_table_final AS SELECT * FROM xpath_view;  
  12.   
  13. SELECT * FROM xpath_table_final WHERE coveragecode = ‘ABC’  


Also, retaining the original source XML allows us to create specific XPath views to fulfill different requirements. We ran multiple queries on the resultant table which included joins, averages, sums etc and it gave us the desired output. The java code to generate XPath tags takes only a few seconds since it works on a very small set but the table creation all depends on the cluster configuration.

Parquet – columnar storage for Hadoop

Parquet is a columnar storage format for Hadoop that uses the concept of repetition/definition levels borrowed from Google Dremel. It provides efficient encoding and compression schemes, the efficiency being improved due to application of aforementioned on a per-column basis (compression is better as column values would all be the same type, encoding is better as values within a column could often be the same and repeated). Here is a nice blog post from Julien describing Parquet internals.
Parquet can be used by any project in the Hadoop ecosystem, there are integrations provided for M/R, Pig, Hive, Cascading and Impala.
I am by no means an expert at this, and a lot of what I write here is based on my conversations with a couple of key contributors on the project (@J_ and @aniket486). Also, most of the content mentioned on this post is based on Pig+Parquet integration. We atSalesforce.com have started using Parquet for application logs processing with Pig and are encouraged with the preliminary performance results.
Writing a Parquet file
There is parquet.hadoop.ParquetWriter. You need to decide which ObjectModel you want to use. It could be Thrift, Avro, Pig or the example model. Here is a function for writing a file using the Pig model (TupleWriteSupport)
private void write(String pigSchemaString, String writePath) throws Exception {
     Schema pigSchema = Utils.getSchemaFromString(pigSchemaString);
     TupleWriteSupport writeSupport = new TupleWriteSupport(pigSchema);      
     FileSystem fs = FileSystem.get(new Configuration());
     Path path = new Path(writePath);
     if(fs.exists(path)) {
         fs.delete(path, true);
     }
     ParquetWriter<Tuple> writer = new ParquetWriter<Tuple>(path, writeSupport,       CompressionCodecName.UNCOMPRESSED,  ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, false, false);

     TupleFactory tf = TupleFactory.getInstance();
     for (int i = 0; i < NUM_RECORDS; i++) {
         Tuple t = tf.newTuple();
         for(int j=0; j<pigSchema.size(); j++) {
         t.append(i+j);
         }
         writer.write(t);

     }
     writer.close();
}
“pigSchemaString” is the schema for the parquet file. This could be any valid pig schema, such as “a:int, b:int, c:int”. Note that I insert integer values in the tuple and hence schema fields are defined to be int.
So what exactly happened during the write? I use TupleWriteSupport which is aWriteSupport implementation that helps us write parquet files compatible with Pig. I then use ParquetWriter passing in a few arguments:
  1. path – file path to write to
  2. writeSupport – TupleWriteSupport in this case
  3. compressionCodecName – could be UNCOMPRESSED, GZIP, SNAPPY, LZO
  4. blockSize – block size which is 128M by default. Total size used by a block
  5. pageSize –  from the parquet docs: “pages should be considered indivisible so smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers). Note: for sequential scans, it is not expected to read a page at a time; this is not the IO chunk.  We recommend 8KB for page sizes.” Default page size is 1MB
  6. enableDictionary – turn on/off dictionary encoding
At the end, I create tuples with a few elements and write to the parquet file.
Reading a Parquet file
There is a bug due to which trying to read Parquet files written using the Pig model (TupleWriteSupport) directly with ParquetReader fails. Seehttps://github.com/Parquet/parquet-mr/issues/195
However, you can use Pig to read the file.
A = load ‘parquet_file’ USING parquet.pig.ParquetLoader();

describe A;

{a: int, b: int, c: int}

B = foreach A generate a;

dump B;
Initial Findings
I ran the PerfTest and noticed it takes longer to read 1 column and takes progressively less for more columns.
Parquet file read took : 1790(ms) for 1 columns
Parquet file read took : 565(ms) for 2 columns
Parquet file read took : 560(ms) for 3 columns
Parquet file read took : 544(ms) for 4 columns
Parquet file read took : 554(ms) for 5 columns
Parquet file read took : 550(ms) for 10 columns
Parquet file read took : 545(ms) for 20 columns
Parquet file read took : 563(ms) for 30 columns
Parquet file read took : 1653(ms) for 40 columns
Parquet file read took : 1049(ms) for 50 columns
That’s the JVM warmup phase!
The JIT compiler will inline methods based on how often they are called. So it waits before doing so to see what gets called often. Julien suggested I run it twice in a row (in the same process) and compare the times. This is generic and nothing in particular to Parquet, but I wanted to highlight in case you happen to run into similar perf numbers.
Voila! 2nd iteration did provide better results.
Parquet file read took : 1809(ms) for 1 columns, iteration 0
Parquet file read took : 563(ms) for 2 columns, iteration 0
Parquet file read took : 562(ms) for 3 columns, iteration 0
Parquet file read took : 548(ms) for 4 columns, iteration 0
Parquet file read took : 548(ms) for 5 columns, iteration 0
Parquet file read took : 554(ms) for 10 columns, iteration 0
Parquet file read took : 548(ms) for 20 columns, iteration 0
Parquet file read took : 550(ms) for 30 columns, iteration 0
Parquet file read took : 1603(ms) for 40 columns, iteration 0
Parquet file read took : 1054(ms) for 50 columns, iteration 0
Parquet file read took : 536(ms) for 1 columns, iteration 1
Parquet file read took : 531(ms) for 2 columns, iteration 1
Parquet file read took : 527(ms) for 3 columns, iteration 1
Parquet file read took : 532(ms) for 4 columns, iteration 1
Parquet file read took : 527(ms) for 5 columns, iteration 1
Parquet file read took : 533(ms) for 10 columns, iteration 1
Parquet file read took : 537(ms) for 20 columns, iteration 1
Parquet file read took : 534(ms) for 30 columns, iteration 1
Parquet file read took : 538(ms) for 40 columns, iteration 1
Parquet file read took : 1966(ms) for 50 columns, iteration 1
Parquet file read took : 523(ms) for 1 columns, iteration 2
Parquet file read took : 525(ms) for 2 columns, iteration 2
Parquet file read took : 691(ms) for 3 columns, iteration 2
Parquet file read took : 529(ms) for 4 columns, iteration 2
Parquet file read took : 530(ms) for 5 columns, iteration 2
Parquet file read took : 531(ms) for 10 columns, iteration 2
Parquet file read took : 532(ms) for 20 columns, iteration 2
Parquet file read took : 532(ms) for 30 columns, iteration 2
Parquet file read took : 1032(ms) for 40 columns, iteration 2
Parquet file read took : 1044(ms) for 50 columns, iteration 2
However its interesting to note there isn’t a huge difference in terms of read time for 1 column vs more. This can be attributed to the fact that this test ran on a dataset with only 100k rows (a few MBs). It will be explained later, advantages of Parquet format are more apparent when used with large files.
I ran another test – reading a regular text file vs parquet file. The file contained 20M rows and 6 columns. The results didn’t seem right, both storage and processing-wise. Was I missing something?
Text file size : 513M
Parquet file size : 895M

Time taken to read, Text : 13784ms
Time taken to read, Parquet: 19551ms
By default compression is not enabled, which is why the Parquet file is larger (footers, headers, summary files take up additional space. Note Parquet stores information regarding each page, column chunk, file to be able to determine the exact pages that need to be loaded by a query. You can find additional info here).
Also if you are reading all the columns, it is expected that the columnar format will be slower as row storage is more efficient when you read all the columns. Project fewer columns and you should find a difference. You should see a projection pushdown message in the logs.
Yes you need bigger files to get benefits from the columnar storage.
At this point, I wanted to try out encoding and see how that plays out on the overall storage. My next question – Are different encoding (RLE, dictionary) to be provided by the client, or does Parquet figure out the right one to use based on the data? Turns out Parquet will use the dictionary encoding if it can but right now you need to turn that on.http://parquet.io/parquet-mr/site/1.0.0-SNAPSHOT/apidocs/index.html
parquet.enable.dictionary=true
Finally did get some nice results after enabling dictionary encoding and filtering on a single column. It was a lot better storage wise too once Dictionary Encoding was enabled. (The following was run on a larger dataset)
1st iteration:

Text file read took : 42231ms
Parquet file read took : 27853ms

Here are the numbers from 2nd iteration - just to negate the effects of JVM warmup, and to be fair to text-row format :)

2nd iteration:

Text file read took : 36555ms
Parquet file read took : 27669ms
Schema Management
Parquet can handle multiple schemas. This is important for our use-case at SFDC for log processing. We have several different types of logs, each with its own schema, and we have a few hundred of them. Most pig queries run against a few log types. Parquet merges schema and provides the ability to parse out columns from different files.
LogType A : organizationId, userId, timestamp, recordId, cpuTime
LogType V : userId, organizationId, timestamp, foo, bar
A query that tries to parse the organizationId and userId from the 2 logTypes should be able to do so correctly, though they are positioned differently in the schema. With Parquet, it’s not a problem. It will merge ‘A’ and ‘V’ schemas and project columns accordingly. It does so by maintaining a file schema in addition to merged schema and parsing the columns by referencing the 2.
Projection Pushdown
One of the advantages of a columnar format is the fact that it can read only those parts from a file that are necessary. The columns not required are never read, avoiding unnecessary and expensive I/O.
For doing this in Pig, just pass in the required schema in to the constructor of ParquetLoader.
A = LOAD ‘/parquet/file’ USING parquet.pig.ParquetLoader.('a:intint');
The above query loads columns ‘a’ and ‘b’ only. When you do so, you should find a message similar to the following in logs
Assembled and processed 4194181 records from 2 columns in 2124 ms: 1974.6615 rec/ms, 1974.6615 cell/ms
If you hadn’t done that, a file containing 16 columns would all be loaded
Assembled and processed 4194181 records from 16 columns in 6313 ms: 664.3721 rec/ms, 10629.953 cell/ms
Summary Files
Parquet generates a summary file for all part files generated under a directory (job output). The summary file reduces the number of calls to the namenode and individual slaves while producing the splits which reduces the latency to start a job significantly. Otherwise it will have to open the footer of every part file which occasionally is slowed down by the namenode or a bad slave that we happen to hit. Reading one summary file reduces the risks to hit a slow slave and the load on the namenode.
For example, if the output directory to which Parquet files are written by a Pig script is ‘/user/username/foo’.
STORE someAlias INTO ‘/user/username/foo’ using parquet.pig.ParquetStorer();
This will create part files under ‘foo’, the number of these part files depends on the number of reducers.
/user/username/foo/part-r-00000.parquet

/user/username/foo/part-r-00001.parquet

/user/username/foo/part-r-00002.parquet

/user/username/foo/part-r-00003.parquet
The summary file is generated when the hadoop job writing the files is finished as it is in the outputCommitter of the output format (ParquetOutputCommitter.commitJob). It reads all footers in parallel and creates the summary file so all subsequent “LOAD” or reads on the directory ‘foo’ could be more efficient.
There is one summary file for all the part files output by the same job. That is, one per directory containing multiple part files.
Hadoop Compatibility
Anyone who has been a part of a major hadoop upgrade should be familiar with how painful the process can be. At SFDC, we moved from a really old version 0.20.2 to 2.x (recently declared GA). This involved upgrading a ton of dependencies, making client side changes to use the newer APIs, bunch of new configurations, and eliminating a whole lot of deprecated stuff. Though this was a major upgrade and most upgrades here on should be smooth(er), it always helps if dependent and 3rd party libraries don’t need to be recompiled.
With Parquet, you should not need to re-compile for hadoop 2. It hides all the hadoop 2 incompatibilities behind reflective calls so the same jars will work.
And finally ..
We at Salesforce.com have been early adopters of several big data open source technologies. Hadoop, Pig, HBase, Kafka, Zookeeper, Oozie to name a few either have been or are in the process of making it to production. Phoenix, a SQL layer on top of HBase is a project that was homegrown and is now open-sourced. Parquet is the latest addition, and we are looking forward to using it for more datasets (Oracle exports for example) in the near future and not just application logs. The Parquet community is helpful, open to new ideas and contributions, which is great for any open source project.
Related Posts Plugin for WordPress, Blogger...