Friday, 15 August 2014

Tokutek Announces New Consensus Algorithm for MongoDB

Tokutek has announced work on a new consensus algorithm with the goal of replacing the existing leader election algorithm in MongoDB. Ark, as the algorithm is named, is being developed for use in TokuMX, Tokutek’s fork of MongoDB, and addresses a number of issues with the existing MongoDB algorithm.
The design is heavily influenced by the Raft and Paxos algorithms and aims to provide the same provably strong consistency guarantees. It differs from Raft to enable it to support the MongoDB architecture and programming model, implementing an asynchronous, pull-based replication model. This, the developers claim,
…supports a wider range of client semantics that allows application developers to choose points along a trade- off between safety and latency. In addition, Ark supports different replication topologies like chained replication and multi-data center replication with more flexibility than Raft does with its synchronous push model.
Tokutek explained the need for this new algorithm by pointing out two issues with the existing MongoDB leader election algorithm. The primary issue is one of correctness. In the blog post announcing Ark, Zardosht Kasheff points out that it is possible for updates that succeed with themajority write concern to roll back.
Our main goal is to modify the election protocol to make TokuMX a true CP system. That is, in the face of network partitions, TokuMX will remain consistent. To do so means ensuring that any write that is successfully acknowledged with majority write concern is never lost in the face of a network partition. This is not currently the case for TokuMX and MongoDB.
The secondary issue that Tokutek draws attention to is one of availability. In the accompanyingtech report Zardosht and coauthor Leif Walsh explain that it is possible for a MongoDB replica set to be unavailable for 30 seconds or more during failover.
MongoDB’s election protocol requires that a member may not vote “yes” in more than one election in any 30-second period. … [T]his 30 second threshold can be problematic in practice, especially if an election fails: this necessarily makes the set unavailable for at least 30 seconds, maybe more if successive elections fail.
Ark addresses these flaws by exploiting the structure of the TokutekDB global transaction identifier (GTID). The GTID consists of a pair of 64-bit integers, (term, opid), where opid is incremented each time an operation commits on the primary, and the term is incremented each time a new primary is elected, and at this point the opid is reset to 0. The term in the GTID serves the same purpose as the term concept in the Raft protocol and that similarity allows Ark to employ many of the same solutions that Raft uses to provide its strong consistency guarantees.
While Ark is an implementation of a consensus protocol that works in a real database system, it is also evidence of the flexibility in the Raft consensus algorithm. It was relatively straightforward to tweak Raft in safe ways to make it fit the MongoDB architecture and programming model, and we think this is an important feature of Raft.
There is an Ark development branch available and Tokutek is actively soliciting feedback on both the design and the implementation.

Building Applications With Hadoop

Avro

Avro is a project for data serialization in formats. It is similar to Thrift or Protocol Buffers. It's expressive. You can deal in terms of records, arrays, unions, enums. It's efficient so it has a compact binary representation. One of the benefits of logging an Avro is that you get much smaller data files. All the traditional aspects of Hadoop data formats, like compressible or splittable data, are true of Avro.
 One of the reasons Doug Cutting (founder of the Hadoop project) created the Avro project was that a lot of the formats in Hadoop were Java only. It’s important for Avro to be interoperable – to a lot of different languages like Java, C, C++, C#, Python, Ruby, etc. – and to be usable by a lot of tools.
One of the goals for Avro is a set of formats and serialization that's usable throughout the data platform that you're using, not just in a subset of the components. So MapReduce, Pig, Hive, Crunch, Flume, Sqoop, etc. all support Avro.
Avro is dynamic and one of its neat features is that you can read and write data without generating any code. It will use reflection and look at the schema that you've given it to create classes on the fly. That's called Avro-generic formats. You can also specify formats for which Avro will generate optimal code.
Avro was designed with expectation that you would change your schema over time. That's an important attribute in a big-data system because you generate lots of data, and you don’t want to constantly reprocess it. You're going to generate data at one time and have tools process that data maybe two, three, or four years down the line. Avro has the ability to negotiate differences between schemata so that new tools can read old data and vice versa.
Avro forms an important basis for the following projects.

Crunch

You're probably familiar with Pig and Hive and how to process data with them and integrate valuable tools. However, not all data formats that you use will fit Pig and Hive.
Pig and Hive are great for a lot of logged data or relational data, but other data types don’t fit as well. You can still process poorly fitting data with Pig and Hive, which don’t force you to a relational model or a log structure, but you have to do a lot of work around it. You might find yourself writing unwieldy user-defined functions or doing things that are not natural in the language. People, sometimes, just give up and start writing raw Java MapReduce programs because that's easier.
Crunch was created to fill this gap. It's a higher-level API than MapReduce. It's in Java. It's lower level than, say, Pig, Hive, Cascade, or other frameworks you might be used to. It's based on a paper that Google published called FlumeJava. It's a very similar API. Crunch has you combine a small number of primitives with a small number of types and effectively allow the user to create really lightweight UDS, which are just Java methods and classes to create complex data pipelines.
Crunch has a number of advantages.
  • It's just Java. You have access to a full programming language.
  • You don't have to learn Pig.
  • The type system is well-integrated. You can use Java POJOs, but there's also a native support for Hadoop Writables in Avro. There's no impedance mismatch between the Java codes you're writing and the data that you're analyzing.
  • It's built as a modular library for reuse. You can capture your pipelines in Crunch code in Java and then combine it with arbitrary machine learning program later, so that someone else can reuse that algorithm.
The fundamental structure is a parallel collection so it's a distributed, unordered collection of elements. This collection has a parallel do operator which you can imagine turns into a MapReduce job. So if you had a bunch of data that you want to operate in parallel, you can use a parallel collection.
And there's something called the parallel table, which is a subinterface of the collection, and it's a distributed sorted map. It also has a group by operators you can use to aggregate all the values for a given key. We'll go through an example that shows how that works.
Finally, there's a pipeline class and pipelines are really for coordinating the execution of the MapReduce jobs that will actually do the back-end processing for this Crunch program.
Let’s take an example for which you've probably seen all the Java code before, word count, and see what it looks like in Crunch.
(Click on the image to enlarge it)
It’s a lot smaller and simpler. The first line creates a pipeline. We create a parallel collection of all the lines from a given file by using the pipeline class. And then we get a collection of words by running the parallel do operator on these lines.
We've got a defined anonymous function here that basically processes the input and word count splits on the word and emits that word for each map task.
Finally, we want to aggregate the counts for each word and write them out. There's a line at the bottom, pipeline run. Crunch's planner does lazy evaluation. We're going to create and run the MapReduce jobs until we've gotten a full pipeline together.
If you're used to programming Java and you've seen the Hadoop examples for writing word count in Java, you can tell that this is a more natural way to express that. This is among the simplest pipelines you can create, and you can imagine you can do many more complicated things.
If you want to go even one step easier than this, there's a wrapper for Scala. This is very similar idea to Cascade, which was built on Google FlumeJava. Since Scala runs on the JVM, it's an obvious natural fit. Scala's type inference actually ends up being really powerful in the context of Crunch.
(Click on the image to enlarge it)
This is the same program but written in Scala. We have the pipeline and we can use Scala's built-in functions that map really nicely to Crunch – so word count becomes a one-line program. It’s pretty cool and very powerful if you're writing Java code already and want to do complex pipelines.

Cloudera ML

Cloudera ML (machine learning) is an open-source library and tools to help data scientists perform the day-to-day tasks, primarily of data preparation to model evaluation.
With built-in commands for summarizing, sampling, normalizing, and pivoting data, Cloudera ML has recently added a built-in clustering algorithm for k-means, based on an algorithm that was just developed a year or two back. There are a couple of other implementations as well. It's a home for tools you can use so you can focus on data analysis and modeling instead of on building or wrangling the tools.
It’s built using Crunch. It leverages a lot of existing projects. For example, the vector formats: a lot of ML involves transforming raw data that's in a record format to vector formats for machine-learning algorithms. It leverages Mahout's vector interface and classes for that purpose. The record format is just a thin wrapper in Avro, and HCatalog is record and schema formats so you can easily integrate with existing data sources.
For more information on Cloudera ML, visit the projects’ GitHub page; there's a bunch of examples with datasets that can get you started.

Cloudera Development Kit

Like Cloudera ML, Cloudera Development Kit a set of open-source libraries and tools that make writing applications on Hadoop easier. Unlike ML though, it's not focused on using machine learning like a data scientist. It's directed at developers trying to build applications on Hadoop. It's really the plumbing of a lot of different frameworks and pipelines and the integration of a lot of different components.
The purpose of the CDK is to provide higher level APIs on top of the existing Hadoop components in the CDH stack that codify a lot of patterns in common use cases.
CDK is prescriptive, has an opinion on the way to do things, and tries to make it easy for you to do the right thing by default, but it’s architect is a system of loosely coupled modules. You can use modules independent of each other. It's not an uber-framework that you have to adopt in whole. You can adopt it piecemeal. It doesn't force you into any particular programming paradigms. It doesn't force you to adopt a ton of dependencies. You can adopt only the dependencies of the particular modules you want.
Let's look at an example. The first module in CDK is the data module, and the goal of the data module is to make it easier for you to work with datasets on Hadoop file systems. There are a lot of gory details to clean up to make this work in practice; you have to worry about serialization, deserialization, compression, partitioning, directory layout, commuting, getting that directory layout, partitioning to other people who want to consume the data, etc.
The CDK data module handles all this for you. It automatically serializes and deserializes data from Java POJOs, if that's what you have, or Avro records if you use them. It has built-in compression, and built-in policies around file and directory layouts so that you don't have to repeat a lot of these decisions and you get smart policies out of the box. It will automatically partition data within those layouts. It lets you focus on working on a dataset on HDFS instead of all the implementation details. It also has plugin providers for existing systems.
Imagine you're already using Hive and HCatalog as a metadata repository, and you've already got a schema for what these files look like. CDK integrates with that. It doesn't require you to define all of your metadata for your entire data repository from scratch. It integrates with existing systems.
You can learn more about the various CDK modules and how to use them in the documentation.
In summary, working with data from various sources, preparing and cleansing data and processing them via Hadoop involves a lot of work. Tools such as Crunch, Cloudera ML and CDK make it easier to do this and leverage Hadoop more effectively.

About the Author

Eli Collins is the tech lead for Cloudera's Platform team, an active contributor to Apache Hadoop and member of its project management committee (PMC) at the Apache Software Foundation. Eli holds Bachelor's and Master's degrees in Computer Science from New York University and the University of Wisconsin-Madison, respectively.

Related Posts Plugin for WordPress, Blogger...