Friday 17 October 2014

Speeding up a Pig+HBase MapReduce job by a factor of 15

The other day I ran a Pig script. Nothing fancy; I loaded some data into HBase and then ran a second Pig job to do some aggregations. I knew the data loading would take some time as it was multiple GB of data, but I expected the second aggregation job to run much faster. It ran for over 15 hours and was not done at that time. This was too long in my mind and I terminated it. I was using Amazon Elastic Map Reduce as my Hadoop environment, so I could have add more resources, but I wasn’t sure if that would do me any good. I took a look at the performance break downs, and within minutes I found the problem. Not only, that I realized that even an expert would have had a hard time identifying the problem. What’s more, the goto fix for more performance in Hadoop, which is adding more hardware, would not have helped at all!

Pig vs. Hive

Pig is one of the two predominant ways to access, transform and query data in Hadoop. Whereas Hive uses a SQL-like approach and it appeals to database people and analysts, Pig is a script language. Its appeal is in its procedural approach and its ability to transform data in a pipeline fashion, this appeals to ETL experts (Extract Transform Load). Pig is typically used to load data into Hadoop and perform complex transformations and aggregations that would be hard to do in a single Hive query. Both systems generate one or more MapReduce jobs and therein lies their power and complexity.

My MapReduce Job

For demonstration purposes I simply used a kmer index job to reproduce the issue, if you are interested in running it yourself get it from: https://github.com/lila/emr-hbase-sample
The job consists of two Amazon EMR steps in a single job flow. The first Pig script loads multiple gigabytes of genome sequence data into a single HBase table (I got some additional fasta sample genome files to do this compared to the original sample). The second pig job produces a size-based index. The respective Pig Latin looks like this:
A = LOAD 'hbase://kmer'
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
'sequence:*', '-loadKey true')
AS (id:bytearray, sequence:map[]);
B = foreach A generate id, SIZE(sequence);
C = group B by $1 PARALLEL $p;
D = foreach C generate group, COUNT(B);
I ran it on an empty HBase table and looked at the performance results.

The Loading Job

As said the first was a Pig job that loaded several gigabytes of data into a simple HBase table. When looking at it we can see a couple of things immediately:
The MapReduce Task is sending inserts to multiple HBase Region servers, thus not storing the data locally
The MapReduce Task is sending inserts to multiple HBase Region servers, thus not storing the data locally
First of all the HBase data is not stored locally. This is not much of a surprise. The Pig script is loading data from a file and gets distributed evenly across the Hadoop cluster based on that input data. That distribution is of course not the same as the resulting region distribution in HBase. Secondly we also notice that for the first 10 minutes only a single HBase region server is hit, then a second and only after 14 minutes do all 5 HBase region servers process data.
At the beginning of the job only a single HBase region Server consumes CPU while all others remain idle
At the beginning of the job only a single HBase region Server consumes CPU while all others remain idle
This is also well documented and has to do with HBase regions and splits (read this very good blogby Hortonworks if you want to know more).
And lastly we also see that this was a map-only job.
When looking at the break down in time we see that the kmerLoad jobs do not spend any time in reducing or shuffling but only in the mapping phase
When looking at the break down in time we see that the kmerLoad jobs do not spend any time in reducing or shuffling but only in the mapping phase
More specifically we spend nearly all time in HBase. As we are loading data, into HBase we would expect it to be the primary contributor.
This break down shows that all MapTasks are processing and spend nearly all their time sending data to hbase
This break down shows that all MapTasks are processing and spend nearly all their time sending data to HBase
Next I took a look at the HBase inserts. We can see how many items we processed and inserted into HBase:
This shows the inserts per region server and hbase table/column family. We also see the number of roundtrips compared to the number of rows inserted
This shows the inserts per region server and HBase table/column family. We also see the number of roundtrips compared to the number of rows inserted
A couple of things are interesting here. First of all we see that the number of inserts is not equally distributed across the region servers. That  is again due to the fact that the table was empty, and hbase started in a single region (see the previous screenshots and hortonworks blog).  We can also compare the number of inserted rows with the number of roundtrips, making sure that we leverage batching correctly.
When comparing this with the number of mapped records we see that we are doing roughly 27 inserts per input record and each output record corresponds to a single HBase row.
This shows Details about the MapReduce Job and several counters. In this case we see the mapped records.
This shows Details about the MapReduce Job and several counters. In this case we see the mapped records.
In summery we can say that the loading is already pretty optimal, the CPU load on the cluster is fine and we insert ~1700 rows into HBase per roundtrip.
This shows a single task attempt inserting rows into HBase, we can see that it inserts many rows in a single roundtrip
This shows a single task attempt inserting rows into HBase, we can see that it inserts many rows in a single roundtrip
Together with the fact that HBase really was the main contributor here means, that there was nothing much that I could do to speed things up, at least not with some serious expert help (suggestions on how to improve this beyond pre-spliting are welcome). So let’s move on the the index job.

The Index Job

The second job was creating a size-based index over all inserted data; hence it had to read all data in the table. This second step lasted for 15 hours and did not complete, because I terminated it at that time. By analyzing it I saw the root cause almost immediately.
The Database Overview shows a single Scan that makes a roundtrip to the HBase region server for every single row it retrieves
The Database Overview shows a single Scan that makes a roundtrip to the HBase region server for every single row it retrieves
Look at the roundtrips and the row counts processed. There is one more roundtrip than there are rows! That means the MapReduce job is calling out to the HBase region server for every single row, which is very inefficient.
The default scan caching on a Pig HBase job should be 100, but that was obviously not the case. The performance breakdown showed me that we were indeed spending all of our time waiting on HBase.
The HBase region Server contributes over 95% to our job execution time
The HBase region Server contributes over 95% to our job execution time
This flow also shows that we spend 95% of our time waiting on HBase
This flow also shows that we spend 95% of our time waiting on HBase
Because we are making one network roundtrip for every single row, the utilization of my cluster was miserable.
The CPU utilization in the hadoop cluster is bad, it only has a couple of spikes which seem to coincide with HBase region splits
The CPU utilization in the hadoop cluster is bad, it only has a couple of spikes which seem to coincide with HBase region splits
Therefore adding more hardware would not have helped much – it was not the lack of resources that made my indexing job slow.

Pig really doesn’t like HBase…

According to the documentation HBaseStorage has an option to set the Scanner Caching, which should lead to fewer roundtrips and more rows fetch in a single roundtrip. This is how it looks like in the PIG script:
A = LOAD 'hbase://kmer'
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
'sequence:*', '-loadKey true -caching 500')
AS (id:bytearray, sequence:map[]);
B = foreach A generate id, SIZE(sequence);
C = group B by $1 PARALLEL $p;
D = foreach C generate group, COUNT(B);
I ran a smaller sample to test this option. However when I looked at the result I saw that it was still reading one row at a time.
We see that the HBase scan is still doing a roundtrip for every single row it retrieves
We see that the HBase scan is still doing a roundtrip for every single row it retrieves
Now I was really curious, I added a couple of sensors to get more visibility and what I found was that HBaseStorage did pick up by caching configuration option as defined in the script, but the actual HBase Scanner was still being configured to use 1 as cache size!
The two selected rows show that while the Pig option (the first) retrieves the caching option defined in my pig script, the Scanner itself still gets Caching size 1 (second selected row)
The two selected rows show that while the Pig option (the first) retrieves the caching option defined in my pig script, the Scanner itself still gets Caching size 1 (second selected row)
I did not give up there though! Pig also has a way to set global MapReduce job options which is what I tried next. This can be done by adding the following to the Pig script
SET hbase.client.scanner.caching 1000;
It didn’t have any effect either. I could verify that the setting made it into job.xml submitted to Hadoop, by looking at the job tracker. I also verified that the Map Task itself was not aware of that setting at all.
This shows that the hbase.client.scanner.caching job option always returns 1, although the job.xml is set to 1000
This shows that the hbase.client.scanner.caching job option always returns 1, although the job.xml is set to 1000
Finally I launched a new Hadoop cluster via EMR and added the HBase option in the bootstrap script, making it a cluster wide configuration setting:
--bootstrap-action s3://us-east-1.elasticmapreduce/bootstrap-actions/configure-hbase --args -s,hbase.client.scanner.caching=500
I ran the job again, and wow was it fast. My sample subset went down from 9 minutes to 2! My overall map time went down from 7 minutes to 25 seconds!
This shows the same job executed twice, once with caching option not working and once with a cluster wide setting that did work. The red rectangle shows the dramatic effect this has on map time
This shows the same job executed twice, once with caching option not working and once with a cluster wide setting that did work. The red rectangle shows the dramatic effect this has on map time
Satisfied that the scanner caching option finally worked I ran my big job again. The improvement was massive. It went down from over 15 hours (where I terminated it!) to 75 minutes! That is over15 times faster on the same hardware!
Looking at the transaction flow we see that we still spend most of our time in HBase, but it went down from over 95% to 72%. The shift meant that a lot more processing was done on the MapReduce side itself. To put this into context this means that a lot more time was spent in the aggregation part of the pig script.
This transaction flow shows that we now spend only 72% of the job time waiting on HBase and much more time doing actual processing
This transaction flow shows that we now spend only 72% of the job time waiting on HBase and much more time doing actual processing
Thus we’ve turned this job from one mostly waiting on HBase and network to one that is actually leveraging the CPU. As a result the utilization of my cluster is much better now, albeit far from optimal.
The CPU load on the Cluster is much better now, compared to before.
The CPU load on the Cluster is much better now, compared to before.
There is still a lot of room for improvement, CPU utilization of my Hadoop nodes is not close to 100%, so we might be able to make this even faster, but that is work for another day. What’s important is, without the insight that our APM solution gives us I would have never know how to fix this!

Conclusion

While a Pig or Hadoop expert might have told me right away to set the caching option, even he wouldn’t have figured out easily why the option didn’t take effect. With APM I was able to verify the number of roundtrips and immediately saw that the option didn’t work; I didn’t need to spend days or hours to realize that. While I do not know why the option doesn’t work (any Hadoop developers here who can tell us?) I was able to remedy the situation and also verify that the site-wide option had the desired effect – with dramatic improvements!
When I ask customers how they deal with performance problems in MapReduce, they often tell me, beyond generic Hadoop tuning they don’t bother. They simply add more hardware, because they cannot have experts spending days or weeks browsing through logs or looking at every job they execute. Now in this case, adding more hardware would not have helped as the existing cluster was well underutilized. And by leveraging an APM solution I was able to figure out the root cause, that the documented fix to this was not working and come up with an alternate solution within an hour! I sped up my job by a factor of 15, without adding any more hardware to my cluster and without needing to have an expert go through log files!

No comments:

Post a Comment

Related Posts Plugin for WordPress, Blogger...