Friday, 17 October 2014

Eating our own dog food – 2x faster Hadoop MapReduce Jobs

For a while now I have been writing about how to analyze and optimize Hadoop jobs beyond just tweaking MapReduce options. The other day I took a look at some of our Outage Analyzer Hadoop jobs and put words into action.
A simple analysis of the Outage Analyzer jobs with Compuware APM 5.5 identified three hotspots and two potential Hadoop problems in one of our biggest jobs. It took the responsible developer a couple of hours to fix it and the result is a 2x improvement overall and a 6x improvement on the Reduce part of the job. Let’s see how we achieved that.

About Outage Analyzer

Outage Analyzer is a free service provided by Compuware which displays in real-time any availability problems with the most popular 3rd party content providers on the Internet.  It is available at http://www.outageanalyzer.com. It uses real time analytical process technologies to do anomaly detection and event correlation and classification. It stores billions of measures taken from Compuware’s global testing  network every day in Hadoop and runs different MapReduce jobs to analyze the data. I examined the performance of these  MapReduce jobs.

Identifying worthwhile jobs to analyze

The first thing I did was look for a worthwhile job to analyze. To do this, I looked at cluster utilization broken down by user and job.
This chart visualizes the cluster CPU usage by user giving a good indication about which user executes the most expensive jobs
This chart visualizes the cluster CPU usage by user giving a good indication about which user executes the most expensive jobs
What I found was that John was the biggest user of our cluster. So I looked at the jobs John was running.
These are all the jobs that John was running over the last several days. Its always the same one, consuming about the same amount of resources
These are all the jobs that John was running over the last several days. Its always the same one, consuming about the same amount of resources
The largest job by far was an analytics simulation of a full day of measurement data.  This job is run often to test and tune changes to the analytics algorithms.  Except for one of the runs, all of them lasted about 6.5 hours in real time and consumed nearly 100% CPU of the cluster during that time. This looked like a worthy candidate for further analysis.

Identifying which Job Phase to focus on

My next step was to look at a breakdown of the job from two angles: consumption and real time. From a real-time perspective, map and reduce took about the same amount of time – roughly 3 hours each. This could also be nicely seen in the resource usage of the job.
This dashboard shows the overall cluster utilization during the time the job ran. 100% of the cluster CPU is used during most of the time
This dashboard shows the overall cluster utilization during the time the job ran. 100% of the cluster CPU is used during most of the time
The significant drop in Load Average and the smaller drop in the other charts mark the end of the mapping phase and the start of pure reducing. What is immediately obvious is that the reducing phase, while lasting about the same time, does not consume as many resources as the map phase. The Load Average is significantly lower and the CPU utilization drops in several steps before the job is finished.
On the one hand that is because we have a priority scheduler and reducing does not use all slots, but more importantly, reducing cannot be parallelized as much as mapping. Every optimization here counts two fold, because you cannot scale things away! While the mapping phase is clearly consuming more resources, the reducing phase is a bottleneck and might therefore benefit even more from optimization.
The breakdown of job phase times shows that the mapping phase consumes twice as much time as reducing, even though we know that the job real time of the two phases is about the same - 3 hours each
The breakdown of job phase times shows that the mapping phase consumes twice as much time as reducing, even though we know that the job real time of the two phases is about the same – 3 hours each
As we can see the Map Time (time we spend in the mapping function, excluding merging, spilling and combining) is twice as high as the reduce time. The reduce time here represents the time that tasks were actually spending in the reduce function, excluding shuffle and merge time (which is represented separately). As such those two times represent those portions of the job that are directly impacted by the Map and Reduce code, which is usually custom code – and therefore tuneable by our developers!

Analyzing Map and Reduce Performance

So as a next step I used Compuware APM to get a high-level performance breakdown of the job’s respective 3 hour mapping and reducing phases. A single click gave me this pretty clear picture of the mapping phase:
This is a hot spot analysis of our 3 hour mapping phase which ran across 10 servers in our hadoop cluster
This is a hot spot analysis of our 3 hour mapping phase which ran across 10 servers in our hadoop cluster
The hot spot dashboard for the mapping phase shows that we spent the majority of the time (about 70%) in our own code and that it’s about 50% CPU time. This indicates a lot of potential for improvement. Next, I looked at the reducing phase.
This hot spot shows that we spend nearly all of our reducing time in all reduce tasks in our own code!
This hot spot shows that we spend nearly all of our reducing time in all reduce tasks in our own code!
This shows that 99% of the reducing time is spent on our own code and not in any Hadoop framework.  Since the reduce phase was clearly the winner in terms of potential, I looked at the details of that hot spot – and immediately found three hot spots that were responsible for the majority of the reduce time.

Three simple code issues consume 70% of the reduce time

This is what the method hot spots dashboard for the reduce phase showed.
These are the method hot spots for the reduce phase, showing that nearly everything is down to only 3 line items
These are the method hot spots for the reduce phase, showing that nearly everything is down to only 3 line items
The top three items in the method hot spot told me everything I needed know. As it turned out nearly all other items listed were sub-hotspots of the top most method:
  1. SimpleDateFormat initialization:
    The 5 items marked in red are all due to the creation of a SimpleDateFormat object. As most of us find out very painfully during our early career as a Java developer, the SimpleDateFormat is not thread safe and cannot be used as a static variable easily. This is why the developer chose the easiest route and created a new one for every call, leading to about 1.5 billion creations of this object. The initialization of the Formatter is actually very expensive and involves resource lookups, locale lookups and time calculations (seen in the separate line items listed here). This item alone consumed about 40% of our reduce time.
    Solution: We chose to use the well-known Joda framework (the code replacement was easy) and made the Formatter a static final variable; totally removing this big hot spot from the equation.
  2. Regular Expression Matching (line two in the picture)
    In order to split the CSV input we were using java.lang.String.split. It is often forgotten that this method uses regular expressions underneath. RegEx is rather CPU intensive and overkill for such a simple job. This was consuming another 15-20% of the allotted CPU time.
    Solution: We changed this to a simple string tokenizer.
  3.  Exception throwing (line three in the picture)
    This example was especially interesting. During the reading of input data we are parsing numeric values, and if the field is not a correct number java.lang.Long.parseLong will throw aNumberFormatException. Our code would catch it, mark the field as invalid and ignore the exception. The fact is that nearly every input record in that feed has an invalid field or an empty field that should contain a number. Throwing this exception billions of times consumed another 10% of our CPU time.
    Solution: We changed the code in a way to avoid the exception altogether.
And there we have it – three simple hot spots were consuming about 70% of our reduce CPU time! During analysis of the mapping portion I found the same hot spots again, where they contributed about 20-30% to the CPU time.
I sent this analysis to the developer and we decided to eat our own dog food, fix it and rerun the job to analyze the changes.

Job done in half the time – 6-fold improvement in reduce time!

The result of the modified code exceeded our expectations by quite a bit! The immediate changes saw the job time reduced by 50%. Instead of lasting about 6.5 hours, it was done after 3.5! Even more impressive was that while the mapping time only went down by about 15%, the reducing time was slashed from 3 hours to 30 minutes!
This is the jobs cluster CPU Usage and Load Average after we made the changes
This is the jobs cluster CPU Usage and Load Average after we made the changes
The Cluster Utilization shows a very clear picture. The overall utilization and load average during mapping phase actually increased a bit and instead of lasting 3 hours 10 minutes it was done after2 hours and 40 minutes. While not huge this is still a 15% improvement.
The reduce phase on the other hand shrank dramatically: from roughly 3 hours to 30 minutes! That means a couple of hours of development work lead to an impressive 6-fold performance improvement! We also see that the reduce phase is of course still not utilizing the whole cluster and its actually the 100% phase that got a lot shorter.

Conclusion

Three simple code fixes resulted in a 100% improvement of our biggest job and a 6-fold speedup of the reduce portion. Suffice it to say that this totally surprised the owners of the job. The job was utilizing 100% of the cluster, which for them meant that from a Hadoop perspective things were running in an optimal fashion. While this is true, it doesn’t mean that the job itself is efficient!
This example shows that optimizing MapReduce jobs beyond tinkering with Hadoop options can lead to a lot more efficiency without adding any more hardware – achieving the same result with fewer resources!
The Hotspot analysis did also reveal some Hadoop specific hotspots that led us to change some job options and speed things up even more. More on that in my next blog.

Top Performance Problems discussed at the Hadoop and Cassandra Summits

The discussion about  Hadoop and Cassandra Summits in the San Francisco Bay Area. It was rewarding to talk to so many experienced Big Data technologists in such a short time frame – thanks to our partners DataStax and Hortonworks for hosting these great events! It was also great to see that performance is becoming an important topic in the community at large. We got a lot of feedback on typical Big Data performance issues and were surprised by the performance related challenges that were discussed. The practitioners here were definitely no novices, and the usual high level generic patterns and basic cluster monitoring approaches were not on the hot list. Instead we found more advanced problem patterns – for both Hadoop and Cassandra.
I’ve compiled a list of the most interesting and most common issues for Hadoop and Cassandra deployments:

Top Hadoop Issues

Map Reduce data locality

Data locality is one of the key advantage of Hadoop Map/Reduce; the fact that the map code is executed on the same data node where the data resides. Interestingly many people found that this is not always the case in practice. Some of the reasons they stated were:
  • Speculative execution
  • Heterogeneous clusters
  • Data distribution and placement
  • Data Layout and Input Splitter
The challenge becomes more prevalent in larger clusters, meaning the more data nodes and data I have the less locality I get. Larger clusters tend not to be complete homogeneous, some nodes are newer and faster then others, bringing the data to compute ratio out of balance. Speculative execution will attempt to use compute power even though the data might not be local. The nodes that contain the data in question might be computing something else, leading to another node doing non-local processing.The root cause might also lie in the data layout/placement and the used Input Splitter. Whatever the reason non-local data processing puts a strain on the network which poses a problem to scalability. The network becomes the bottleneck. Additionally, the problem is hard to diagnose because it is not easy to see the data locality.
To improve data locality, you need to first detect which of your jobs have a data locality problem or degrade over time. With APM solutions you can capture which tasks access which data nodes. Solving the problem is more complex and can involve changing the data placement and data layout, using a different scheduler or simply changing the number of mapper and reducer slots for a job. Afterwards, you can verify whether a new execution of the same workload has a better data locality ratio.

Job code inefficiencies and “profiling” Hadoop workloads

The next item confirmed our own views and is very interesting: many Hadoop workloads suffer from inefficiencies. It is important to note that this is not a critique on Hadoop but on the jobs that are run on it. However “profiling” jobs in larger Hadoop clusters is a major pain point. Black box monitoring is not enough and traditional profilers cannot deal with the distributed nature of a Hadoop cluster. Our solution to this problem was well received by a lot of experienced Hadoop developers. We also received a lot of interesting feedback on how to make our Hadoop job “profiling” even better.

TaskTracker performance and the impact on shuffle time

It is well known that shuffle is one of the main performance critical areas in any Hadoop job. Optimizing the amount of map intermediate data (e.g. with combiners), shuffle distribution (with partitioners) and pure read/merge performance (number of threads, memory on the reducing side) are described in many Performance Tuning articles about Hadoop. Something that is less often talked about but is widely discussed by the long-term “Hadoopers” is the problem of a slowdown of particular TaskTrackers.
When particular compute nodes are under high pressure, have degrading hardware, or run into cascading effects, the local TaskTracker can be negatively impacted. To put it in more simple terms, in larger systems some nodes will degrade in performance!
The result is that the TaskTracker nodes cannot deliver the shuffle data to the reducers as fast as they should or may react with errors while doing so. This has a negative impact on virtually all reducers and because shuffle is a choke point the entire job time can and will increase. While small clusters allow us to monitor the performance of the handful of running TaskTrackers, real world clusters make that infeasible. Monitoring with Ganglia based on averages effectively hides which jobs trigger this, which are impacted and which TaskTrackers are responsible and why.
The solution to this is a baselining approach, coupled with a PurePath/PureStack model. Baselining of TaskTracker requests solves the averaging and monitoring problem and will tell us immediately if we experience a degradation of TaskTracker mapOutput performance. By always knowing which TaskTrackers slow down, we can correlate the underlying JVM host health and we are able to identify if that slowdown is due to infrastructure or Hadoop configuration issues or tied to a specific operating system version that we recently introduced. Finally, by tracing all jobs, task attempts as well as all mapOutput requests from their respective task attempts and jobs we know which jobs may trigger a TaskTracker slowdown and which jobs suffer from it.

NameNode and DataNode slowdowns

Similar to the TaskTrackers and their effect on job performance, a slowdown of the NameNode or slowdowns of particular DataNodes have a deteriorating effect on the whole cluster. Requests can easily be baselined, making the monitoring and degradation detection automatic. Similarly, we can see which jobs and clients are impacted by the slowdown and the reason for the slowdown, be it infrastructure issues, high utilization or errors in the services.

Top Cassandra Issues

One of the best presentations about Cassandra performance was done by Spotify at the Cassandra Summit. If you use Cassandra or plan to use it you; I highly recommended to watch it!

Read Time degradation over time

As it turns out Cassandra is always fast when first deployed but there are many cases where read time degrades over time. Virtually all of the use cases center around the fact that over time, the rows get spread out over many SStables and/or deletes, which lead to tombstones. All of these cases can be attributed to wrong access patterns and wrong schema design and are often data specific. For example if you write new data to the same row over a long period of time (several months) then this row will be spread out over many SStables. Access to it will become slow while access to a “younger” row (which will reside in only one SSTable) will still be snappy. Even worse is a delete/insert pattern; adding and removing columns to the same row over time. Not only will the row be spread out, it will be full of tombstones and read performance will be quite horrible. The result is that the average performance might degrade only slightly over time (averaging effect). When in reality the performance of the older rows will degrade dramatically, while the younger rows stay fast.
To avoid this, never ever delete data as general pattern in your application and never write to the same row over long periods of time. To catch such a scenario you should baseline Cassandra read requests on a per column family basis. Base-lining approaches as compared to averages will detect a change in distribution and will notify you if a percentage of your requests degrade while the majority or some stay super fast. In addition by tying the Cassandra requests to the actual types of end-user requests you will be able to quickly figure out where that access anti-pattern originates.

Some slow Nodes can bring down the cluster

Like every real world application, Cassandra Nodes can slow down due to many issues (hardware, compaction, GC, network, disk etc.). Cassandra is a clustered database where every row exists multiple times in the cluster and every write request is sent to all nodes that contain the row (even on consistency level one). It is no big deal if a single node fails because others have the same data; all read and write requests can be fulfilled. In theory a super slow node should not be a problem unless we explicitly request data with consistency level “ALL,” because Cassandra would return when the required amount of nodes responded. However internally every node has a coordinator queue that will wait for all requests to finish, even if it would respond to the client before that has happened. That queue can fill up due to one super slow node and would effectively render a single node unable to respond to any requests. This can quickly lead to a complete cluster not responding to any requests.
The solution to this is twofold. If you can, use a token-aware client like Astyanax. By talking directly to the nodes containing the data item, this client effectively bypasses the coordinator problem. In addition you should baseline the response time of Cassandra requests on the server nodes and alert yourself if a node slows down. Funnily enough bringing down the node would solve the problem temporarily because Cassandra will deal with that issue nearly instantaneously.

Too many read round trips/Too much data read

Another typical performance problem with Cassandra reminds us of the SQL days and is typical for Cassandra beginners. It is a database design issue and leads to transactions that make too many requests per end-user transaction or read too much data. This is not a problem for Cassandra itself, but the simple fact of making many requests or reading more data slows down the actual transaction. While this issue can be easily monitored and discovered with an APM solution, the fix is not as trivial as in most cases it requires a change of code and the data model.

Summary

Hadoop and Cassandra are both very scalable systems! But as often stated scalability does not solve performance efficiency issues and as such neither of these systems is immune to such problems, nor to simple misuse.
Some of the prevalent performance problems are very specific to these systems and we have not seen them in more traditional systems. Other issues are not really new, except for fact that they now occur in systems that are tremendously more distributed and bigger than before. The very scalability and size makes these problems harder to diagnose (especially in Hadoop) while often having a very high impact (as in bringing down a Cassandra cluster). Performance experts can rejoice, they will have a job for a long time to come.

Related Posts Plugin for WordPress, Blogger...