Friday, 15 August 2014

What is Apache Tez?

Tez generalizes the MapReduce paradigm to a more powerful framework based on expressing computations as a dataflow graph. Tez is not meant directly for end-users – in fact it enables developers to build end-user applications with much better performance and flexibility. Hadoop has traditionally been a batch-processing platform for large amounts of data. However, there are a lot of use cases for near-real-time performance of query processing. There are also several workloads, such as Machine Learning, which do not fit will into the MapReduce paradigm. Tez helps Hadoop address these use cases.
The Tez project aims to be highly customizable so that it can meet a broad spectrum of use cases without forcing people to go out of their way to make things work; projects such as Hiveand Pig are seeing significant improvements in response times when they use Tez instead of MapReduce as the backbone for data processing. Tez is built on top of YARN, which is the new resource-management framework for Hadoop.

Design Philosophy

The main reason for Tez to exist is to get around limitations imposed by MapReduce. Other than being limited to writing mappers and reducers, there are other inefficiencies in force-fitting all kinds of computations into this paradigm – for e.g. HDFS is used to store temporary data between multiple MR jobs, which is an overhead. (In Hive, this is common when queries require multiple shuffles on keys without correlation, such as with join - grp by - window function - order by.)

The key elements forming the design philosophy behind Tez -
  • Empowering developers (and hence end users) to do what they want in the most efficient manner
  • Better execution performance
Some of the things that helps Tez achieve these goals are –
  • Expressive Dataflow APIs - The Tez team wants to have an expressive-dataflow-definition API so that you can describe the Direct Acyclic Graph (DAG) of computation that you want to run. For this, Tez has a structural kind of API in which you add all processors and edges and visualize what you are actually constructing.
  • Flexible Input-Processor-Output runtime model – can construct runtime executors dynamically by connecting different inputs, processors and outputs.
  • Data type agnostic – only concerned with movement of data, not with the data format (key-value pairs, tuple oriented formats, etc)
  • Dynamic Graph Reconfiguration
  • Simple Deployment – Tez is completely a client-side application, leverages YARN local resources and distributed cache. There's no need to deploy anything on your cluster as far as using Tez is concerned. You just upload the relevant Tez libraries to HDFS then use your Tez client to submit with those libraries.

    You can even have two copies of the libraries on your cluster. One would be a production copy, which is the stable version and which all your production jobs use. Your users can experiment with a second copy, the latest version of Tez. And they will not interfere with each other.
  • Tez can run any MR job without any modification. This allows for stage-wise migration of tools that currently depend on MR.
Exploring the Expressive Dataflow APIs in detail - what can you do with this? For e.g. instead of using multiple MapReduce jobs, you can use the MRR pattern, such that a single map has multiple reduce stages; this can allow streaming of data from one processor to another to another, without writing anything to HDFS (it will be written to disk only for check-pointing), leading to much better performance. The below diagrams demonstrate this -
The first diagram demonstrates a process that has multiple MR jobs, each storing intermediate results to the HDFS – the reducers of the previous step feeding the mappers of the next step. The second diagram shows how with Tez, the same processing can be done in just one job, with no need to access HDFS in between.
Tez’s flexibility means that it requires a bit more effort than MapReduce to start consuming; there's a bit more API and a bit more processing logic that you need to implement. This is fine since it is not an end-user application like MapReduce; it is designed to let developers build end-user applications on top of it.
Given that overview of Tez and its broad goals, let's try to understand the actual APIs.

Tez API

The Tez API has the following components –
  • DAG (Directed Acyclic Graph) – defines the overall job. One DAG object corresponds to one job
  • Vertex – defines the user logic along with the resources and the environment needed to execute the user logic. One Vertex corresponds to one step in the job
  • Edge – defines the connection between producer and consumer vertices.

    Edges need to be assigned properties; these properties are essential for Tez to be able to expand that logical graph at runtime into the physical set of tasks that can be done in parallel on the cluster. There are several such properties –
    • The data-movement property defines how data moves from a producer to a consumer.
    • Scheduling properties (sequential or concurrent) helps us define when the producer and consumer tasks can be scheduled relative to each other.
    • Data-source property (persisted, reliable or ephemeral), defines the lifetime or durability of the output produced by our task so that we can determine when we can terminate it.
You can view this Hortonworks article to see an example of the API in action, more detail about these properties and how the logical graph expands at run-time.
The runtime API is based on an input-processor-output model which allows all inputs and outputs to be pluggable. To facilitate this, Tez uses an event-based model in order to communicate between tasks and the system, and between various components. Events are used to pass information such as task failures to the required components, flow of data from Output to the Input such as location of data that it generates, enabling run-time changes to the DAG execution plan, etc.
Tez also comes with various Input and Output processors out-of-the-box.
The expressive API allows higher language (such as Hive) writers to elegantly transform their queries into Tez jobs.

Tez Scheduler

The Tez scheduler considers a lot of things when deciding on task assignments – task-locality requirements, compatibility of containers, total available resources on the cluster, priority of pending task requests, automatic parallelization, freeing up resources that the application cannot use anymore (because the data is not local to it) etc. It also maintains a connection pool of pre-warmed JVMs with shared registry objects. The application can choose to store different kinds of pre-computed information in those shared registry objects so that they can be reused without having to recompute them later on, and this shared set of connections and container-pool resources can run those tasks very fast.
You can read more about reusing of containers in Apache Tez.

Flexibility

Overall, Tez provides a great deal of flexibility for developers to deal with complex processing logic. This can be illustrated with one example of how Hive is able to leverage Tez.
Let's take this typical TPC-DS query pattern in which you are joining multiple tables with a fact table. Most optimizers and query systems can do what is there in the top-right corner: if the dimension tables are small, then they can broadcast-join all of them with the large fact table, and you can do that same thing on Tez.
But what if these broadcasts have user-defined functions that are expensive to compute? You may not be able to do all of that this way. You may have to break up your tasks into different stages, and that's what the left-side topology shows you. The first dimension table is broadcast-joined with the fact table. The result is then broadcast-joined with the second dimension table.
Here, the third dimension table is not broadcastable because it is too large. You can choose to do a shuffle join, and Tez can efficiently navigate the topology without falling over just because you can't do the top-right one.
The two benefits for this kind of Hive query with Tez are:
  • it gives you full DAG support and does a lot automatically on the cluster so that it can fully utilize the parallelism that is available in the cluster; as already discussed above, this means there is no need for reading/writing from HDFS between multiple MR jobs, all the computation can be done in a single Tez job.
  • it provides sessions and reusable containers so that you have low latency and can avoid recombination as much as possible.
This particular Hive query is seeing performance improvement of more than 100% with the new Tez engine.

Roadmap

  • Richer DAG support. For example, can Samza use Tez as a substrate on which to build the application? It needs some support in order for Tez to handle Samza’s core scheduling and streaming requirements. The Tez team wants to explore how we would enable those kinds of connection patterns in our DAGs. They also want more fault-tolerance support, more efficient data transfer for further performance optimization, and improved session performance.
  • Given that these DAGs can get arbitrarily complex, we need a lot of automatic tooling to help the users understand their performance bottlenecks

Summary

Tez is a distributed execution framework that works on computations represented as dataflow graphs. It maps naturally to higher-level declarative languages like Hive, Pig, Cascading, etc. It's designed to have highly customizable execution architecture so that we can make dynamic performance optimizations at runtime based on real information about the data and the resources. The framework itself automatically determines a lot of the hard stuff, allowing it to work right out-of-the-box.
You get good performance and efficiency out-of-the-box. Tez aims to address the broad spectrum of use cases in the data-processing domain in Hadoop, ranging from latency to complexity of the execution. It is an open-source project. Tez works, Saha and Murthy suggest, and is already being used by Hive and Pig.

About the Authors

Arun Murthy is the lead of the MapReduce project in Apache Hadoop where he has been a full-time contributor to Apache Hadoop since its inception in 2006. He is a long-time committer and member of the Apache Hadoop PMC and jointly holds the current world sorting record using Apache Hadoop. Prior to co-founding Hortonworks, Arun was responsible for all MapReduce code and configuration deployed across the 42,000+ servers at Yahoo!

Bikas Saha has been working on Apache Hadoop for over a year and is a committer on the project. He has been a key contributor in making Hadoop run natively on Windows and has focused on YARN and the Hadoop compute stack. Prior to Hadoop, he has worked extensively on the Dryad distributed data processing framework that runs on some of the worlds largest clusters as part of Microsoft Bing infrastructure





Emerging Trends in Big Data Technologies

Big Data technologies have been getting lot of attention over the last few years. There are several trends and innovations happening in this space. InfoQ would like to learn what new trends in Big Data you are currently using or planning on using in the future.

Streaming Big Data analytics

  • Storm: Apache Storm is an open source distributed real-time computation system. Storm makes it easy to process streams of data, doing for real-time processing what Hadoop did for batch processing.
  • Spark: Spark is an in-memory data-processing platform that is compatible with Hadoop data sources but runs much faster than Hadoop MapReduce. It’s well suited for machine learning jobs, as well as interactive data queries, and is easier for many developers because it includes APIs in Scala, Python and Java.
  • Twitter's Summingbird: Summingbird is a library that lets you write streaming MapReduce programs and execute them on distributed MapReduce platforms like Storm and Scalding.
  • AWS Kinesis: Amazon Kinesis is a managed service for real-time processing of streaming data. It can collect and process large data from several different sources, allowing you to write applications that process information in real-time, from sources such as web site click-streams, marketing and financial information, manufacturing instrumentation and social media, and operational logs and metering data.
  • DataTorrent: DataTorrent is a real-time streaming platform that enables businesses to perform data processing or transformations on structured or unstructured data, in real-time as the data is streaming into the data center. The product leverages Hadoop 2.0 and YARN technologies.
  • Spring XD: Spring XD framework supports streams for the ingestion of event driven data from a source to a sink that passes through any number of processors. The streams are backed by Spring Integration adapters.

Big Data (Hadoop) as a Service

  • Elastic MapReduce: Amazon Elastic MapReduce (Amazon EMR) is a web service that that can be used to process large amounts of data. It uses Hadoop to distribute the data and processing across a resizable cluster of Amazon EC2 instances.
  • Qubole: Qubole's Big Data as a Service provides a Hadoop cluster with built-in data connectors and a graphical editor for the Big Data projects.
  • Mortar: Mortar is a general-purpose platform for high-scale data science. It's built on the Amazon Web Services cloud, using Elastic MapReduce (EMR) to launch Hadoop clusters and process large data sets. Mortar runs Apache Pig, a data flow language built on top of Hadoop. Mortar runs on open-source technologies like Hadoop, Pig, Java, Jython, and Luigi to let the users focus on the data science without worrying about IT infrastructure.
  • Rackspace: With Rackspace Hadoop clusters, you can run Hadoop on Rackspace managed dedicated servers, spin up Hadoop on the public cloud, or configure your own private cloud.
  • Joyent: Joyent Solution for Hadoop is a cloud-based hosting environment for your big data projects based on Apache Hadoop. It provides the data storage services to capture, analyze and access data in any format, data management services to process, monitor and operate Hadoop, and data platform services to secure, archive and scale for consistent availability.
  • Google: Hadoop on Google Cloud Platform uses the open-source Apache Hadoop on Google Compute Engine virtual machines.

SQL-in-Hadoop

  • Apache Hive: Apache Hive facilitates querying and managing large datasets residing in distributed storage. It also allows the map reduce programmers to plug in custom mappers and reducers.
  • Impala: Cloudera’s Impala is an open source massively parallel processing (MPP) SQL query engine that runs natively in Apache Hadoop. It enables users to directly query data stored in HDFS and Apache HBase without requiring data movement or transformation.
  • Shark: Shark is a data warehouse system for Spark designed to be compatible with Apache Hive. Shark supports Hive's query language, metastore, serialization formats, and user-defined functions.
  • Spark SQL: Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using Spark. Spark SQL is currently an alpha component.
  • Apache Drill: Apache Drill, currently an Apache incubation project. provides ad-hoc queries to different data sources, including nested data. Inspired by Google's Dremel, Drill is designed for scalability and the ability to query large sets of data. This project is backed by MapR.
  • Apache Tajo: Apache Tajo is a big data relational and distributed data warehouse system for Apache Hadoop. Tajo is designed for low-latency and scalable ad-hoc queries, online aggregation, and ETL (extract-transform-load process) on large-data sets stored on HDFS (Hadoop Distributed File System) and other data sources.
  • Presto: Presto framework from Facebook, is an open source distributed SQL query engine for running interactive analytic queries against data sources of all sizes.
  • Phoenix: Phoenix, from Salesforce, is an open source SQL query engine for Apache HBase and is accessed as a JDBC driver and enables querying and managing HBase tables using SQL. It was submitted as a proposal to become an Apache Incubator project.
  • Pivotal's HAWQ: HAWQ, part of Pivotal's Big Data Suite, is a MPP SQL processing engine optimized for analytics with full transaction support. It breaks complex queries into small tasks and distributes them to MPP query processing units for execution.

Big Data Lambda Architecture

The Lambda Architecture (LA) provides a hybrid platform by combining real-time data and data pre-computed by the Hadoop environment together to provide a near-real time view of the data at all times. Lambda Architecture frameworks include the following:
  • Twitter's Summingbird
  • Lambdoop: Lambdoop is a new Big Data middleware designed for data scientist and developers to build Big Data solutions combining streaming and batch data analytics.
Please rank the following Big Data technology trends, based on YOUR experience working with them or researching them for potential use in production environments, based on the following criteria:
  • Value Proposition: The value these databases can potentially bring to the business.
  • Adoption Readiness: How ready they are to be used in the real world applications.
You can drag and drop the option on the radar and click “Submit Now!” when you are done.
Related Posts Plugin for WordPress, Blogger...