Friday 15 August 2014

Building a Real-time, Personalized Recommendation System with Kiji

Today, recommendations are everywhere online. Major e-commerce websites like Amazon provide product recommendations in many different forms across their web properties. Financial planning sites like Mint.com provide recommendations for things like credit cards that a user might want to sign up for or banks that can offer better interest rates. Google augments search results based on its knowledge of the users’ past searches to find the most relevant results.
These brands use recommendations to provide contextual, relevant user experience in order to increase conversion rates and user satisfaction. Traditionally, these sorts of recommendations have been computed by batch processes that generate new recommendations on a nightly, weekly or even monthly basis.
However, for certain types of recommendations, it’s necessary to react in a much shorter timeframe than batch processing allows, such as offering a consumer a geo-location-based recommendation. Consider a movie recommendation system -- If a user historically watches action movies, but is currently searching for a comedy, batch recommendations will likely result in recommendations for more action movies instead of the most relevant comedy. In this article, you will learn how to use the Kiji framework, an open source framework for building Big Data Applications, to build a system that provides real-time recommendations.

Kiji, Entity-Centric Data, and the 360º View

To build a real-time recommendation system, we first need a system that can be used to store a 360º view of our customers. Moreover, we need to be able to retrieve data about a particular customer quickly in order to produce recommendations as they interact with our website or mobile app. Kiji is an open-source, modular framework for building real-time applications that collect, store and analyze this sort of data.
More generally, the data necessary for a 360º view can be termed entity-centric data. An entity could be any number of things such as a customer, user, account, or something more abstract like a point-of-sale system or a mobile device.
The goal of an entity-centric storage system is to be able to store everything about a particular entity in a single row. This is challenging with traditional, relational databases because the information may be both stateful data (like name, email address, etc.) and event streams (like clicks). A traditional system requires storing this data in multiple tables, which get joined together at processing time, which makes it harder to do real-time processing. To deal with this challenge, Kiji leverages Apache HBase, which stores data in four dimensions -- row, column family, column qualifier, and timestamp. By leveraging the timestamp dimension, and the ability of HBase to store multiple versions of a cell, Kiji is able to store event-stream data alongside the more stateful, slowly-changing data.
HBase is a key-value store built on top of HDFS and used by Apache Hadoop, which provides the scalability that is necessary for a Big Data solution. A large challenge with developing applications on HBase is that it requires that all the data going in and out of the system be byte arrays. To deal with this, the final core component of Kiji is Apache Avro, which is used by Kiji to store easily-processed data types like standard strings and integers, as well as more complex user-defined data types. Kiji handles any necessary serialization and deserialization for the application when reading or writing data.

Developing Models for Use in Real Time

Kiji provides two APIs for developing models, in Java or Scala, both of which have a batch and a real-time component. The purpose of this split is to break down a model into distinct phases of model execution. The batch phase is a training phase, which is typically a learning process, in which the model is trained over a dataset for the entire population. The output of this phase might be things like parameters for a linear classifier or locations of clusters for a clustering algorithm or a similarity matrix for relating items to one another in a collaborative filtering system. The real-time phase is known as the scoring phase, and takes the trained model and combines it with an entity’s data to produce derived information. Critically, this derived data is considered first-class, in that it can be stored back in the entity’s row for use in serving recommendations or for use as input in later computations.
The Java APIs are called KijiMR, and the Scala APIs form the core of a tool called KijiExpress. KijiExpress leverages the Scalding library to provide APIs for building complex MapReduce workflows, while avoiding a significant amount of boilerplate code typically associated with Java, as well as the job scheduling and coordination that is necessary for stringing together MapReduce jobs.

Individuals Versus Populations

The reason for the differentiation between batch training and real-time scoring is that Kiji makes the observation that population trends change slowly, while individual trends change quickly.
Consider a dataset for a user population that contains ten million purchases. One more purchase is not likely to dramatically affect trends for the population and their likes or dislikes. However, if a particular user has only ever made ten purchases, the eleventh purchase will have a huge affect on what a system can determine that the user is interested in. Given this assertion, an application will only need to retrain its model once enough data has been gathered to affect the population trends. However, we can improve recommendation relevancy for an individual user by reacting to their behavior in real time.

Scoring Against a Model in Real Time

In order to score in real time, the KijiScoring module provides a lazy computation system that allows an application to generate refreshed recommendations only for users that are actively interacting with the application. Through lazy computation, Kiji applications can avoid generating recommendations for users that don’t frequently or may never return for a second visit. This also has the added benefit that Kiji can take into account contextual information like the location of their mobile device at the time of the recommendation.
The primary component in KijiScoring is called a Freshener. Fresheners are really a combination of a couple of other Kiji components: ScoringFunctions and FreshnessPolicies. As mentioned earlier, a model will consist of both a training and a scoring phase. The ScoringFunction is the piece of code that describes how a trained model and a single entity’s data are combined to produce a score or recommendations. A FreshnessPolicy defines when data becomes stale or out-of-date. For example, a common FreshnessPolicy will say that data is out-of-date when it is older than an hour or so. A more complex policy might mark data as out-of-date once an entity has experienced some number of events, like clicks or product views. Finally, the ScoringFunction and FreshnessPolicy are attached to a particular column in a Kiji table which will trigger a refresh of the data, if necessary.
Applications that do real-time scoring will include a tier of servers called KijiScoring servers, which fill the role of an execution layer for refreshing stale data. When a user interacts with the application, the request will be passed to the KijiScoring server tier, which communicates directly with the HBase cluster. The KijiScoring server will request the data, and once retrieved, determine whether or not the data is up-to-date, according to the FreshnessPolicy. If the data is up-to-date, it can just be returned to the client. However, if the data comes back stale, the KijiScoring server will run the specified ScoringFunction for the user that made the request. The important piece to understand is that the data or recommendations that are being refreshed are only being refreshed for the user that is making the request, rather than a batch operation, which would refresh the data for all users. This is how Kiji avoids doing more work than is necessary. Once the data is refreshed, it’s returned to the user, and written back to HBase for use later on.
A typical Kiji application will include some number of KijiScoring servers, which are stateless Java processes that can be scaled out, and that are able to run a ScoringFunction using a single entity’s data as input. A Kiji application will funnel client requests through the KijiScoring server, which determines whether or not data is fresh. If necessary, it will run a ScoringFunction to refresh any recommendations before they are passed back to the client, and write the recomputed data back to HBase for later use.

Deploying Models to a Production System

A major goal in a real-time recommendation system is to be able to iterate on the underlying predictive models easily, and avoid application downtime to push new or improved models into production. To do that, Kiji provides the Kiji Model Repository, which combines metadata about how the models execute with the code that is used to train and score the models. The KijiScoring server needs to know what column accesses should trigger freshening, the FreshnessPolicy to be applied, and the ScoringFunction that will be executed against user data, as well as the locations of any trained models or external data necessary for scoring against the model. This metadata is stored in a Kiji system table, which is just another HBase table at the lowest level. Additionally, the Model Repository stores code artifacts for registered models in a managed Maven repository. The KijiScoring server periodically polls the Model Repository for newly-registered or -unregistered models, and loads or unloads code as necessary.

Putting It All Together

A very common way to provide recommendations is through the use of collaborative filtering. Collaborative filtering algorithms typically involve building a large similarity matrix to store information relating to a product to other products in the product catalog. Each row in the matrix represents a product pi, and each column represents another product pj. The value at (pi, pj) is the similarity between the two products.
In Kiji, the similarity matrix is computed via a batch training process, and then can be stored in a file or a Kiji table. Each row of the similarity matrix would be stored in a single row in the product table in Kiji in its own column. In practice, this column has the potential to be very large, since it would be a list of all the products in the catalog and similarities. Typically, the batch job will also do the work of picking only the most similar items to put into the table.
This similarity matrix is accessed at scoring time through the KeyValueStore API, which gives processes access to external data. For matrices that are too large to store in memory, storing the matrix in a distributed table enables the application to only request the data that is necessary for the computation, and dramatically reduce the memory requirements.
Since we’ve done a lot of the heavy lifting ahead of the scoring phase, scoring becomes a fairly simple operation. If we wanted to display recommendations based on an item that was viewed, a non-personalized scoring function would just look up the related products from the product table and display those.
It’s a relatively simple task to take this process a little further and personalize the results. In a personalized system, the scoring function would take a user’s recent ratings and use the KeyValueStore API to find products similar to the products that the user had rated. By combining the ratings and the product similarities stored in the products table, the application can predict the ratings that the user would give related items and offer recommendations of the products with the highest predicted ratings. By limiting both the number of ratings used and the number of similar products per rated product, the system can easily handle this operation as the user is interacting with the application.

Conclusion

In this article, we’ve seen, at a high level, how Kiji can be used to develop a recommendation system that refreshes recommendations in real time. By leveraging HBase to do low latency processing, using Avro to store complex data types, and processing data using MapReduce and Scalding, applications can provide relevant recommendations to users in a real-time context. For those who are interested in seeing an example of this system, there is code for a very similar application located on the WibiData Github.

About the Author

Jon Natkins (@nattyice) is a field engineer at WibiData where he is focused on helping users build Big Data Applications on Kiji and WibiEnterprise. Prior to WibiData, Jon worked in software engineer roles for Cloudera and Vertica Systems.

Costin Leau on Elasticsearch, BigData and Hadoop

Elasticsearch is an open source, distributed real-time search and analytics engine for the cloud. It’s built on Apache Lucene search engine library and provides full text search capabilities, multi-language support, a query language, support for geolocation, context aware did-you-mean suggestions, autocomplete and search snippets.
Elasticsearch supports RESTful API using JSON over HTTP for all of its operations, whether it's search, analytics or monitoring. In addition, native clients for different languages like Java, PHP, Perl, Python, and Ruby are available. Elasticsearch is available for use under the Apache 2 license. The first milestone of elasticsearch-hadoop 1.3.M1 was released in early October.
InfoQ spoke with Costin Leau from Elasticsearch team about the search and analytics engine and how it integrates with Hadoop and other Big Data technologies.
InfoQ: Hi Costin, can you describe what Elasticsearch is and how it helps with Big Data requirements?
Elasticsearch is a scalable, highly-available, open-source search and analytics engine based on Apache Lucene. It is easy to "dig" through your data and to "zoom" in and out - all in real-time. At Elasticsearch, we’ve put a lot of work into delivering a good user experience out of the box. We set good defaults that make it easy to get started, but we also give you full access, when you need it, to customize virtually every aspect of the engine.
For example, you can use it to search your data, from the typical queries ('find all items X that match Y') to filtering (or “views” in Elasticsearch terms), highlighted search snippets which provide context for each result, geolocation ('find all items with Z miles'), did-you-mean suggestions and powerful aggregations (Elasticsearch’s “facets”) such as date histograms or statistics.
Elasticsearch can both search and store your data. It offers a semi-structured, schema-free, JSON based model; you can just toss JSON documents at it and Elasticsearch will automatically detect your data types and index your documents, or you can customize the schema mapping to suit your purposes, e.g. boosting individual fields or documents, custom full text analysis, etc.
You can start with a small instance on your laptop or take it to the cloud with tens or hundreds of instances, all with minimal changes. Elasticsearch will automatically scale horizontally and grow with your app.
It runs on the JVM and uses JSON over a RESTful HTTP interface, so any client/language can interact with it. There are a plethora of clients and framework integrations in various languages that provide native APIs and dedicated DSLs to minimize 'friction' and maximize performance.
Elasticsearch is a great fit for "Big Data" because its scalable, distributed nature allows it to search - and store - vast amounts of information in near real-time. Through the Elasticsearch-Hadoop project, we are enabling Hadoop users (including Hive, Pig andCascading) to enhance their workflow with a full-blown search engine. We give them a rich language to ask better questions in order to get clearer answers, significantly faster.

InfoQ: Elasticsearch is used for real-time full text search. Can you tell us how real-time full text search differs from traditional data search?
In layman’s terms, traditional search is a subset of full text search.
Search as implemented by most data stores is based on metadata or on parts of the original data; for efficiency reasons, a subset of data that is considered relevant is indexed (such as the entry id, name, etc...) and the rest is ignored. This results in a small index when compared to the data size, but one that doesn't fully cover the data set. Full text search alleviates this problem by indexing and searching the entire corpus at the expense of increased need for storage.
Traditional search is typically associated with structured data because it is easier for the user to know what is relevant and what is not; however, when you look at today's requirements, most data is unstructured. Now, you store all data once and then, when necessary, look at it many times across several different formats and structures; a full-text search approach becomes mandatory in such cases, as you can no longer afford to just ignore data.
Elasticsearch supports both structured data search and full text search. It provides a wide variety of query options from keywords, Boolean queries, filters and fuzzy search just to name a few, all exposed via a rich query language.
Note that Elasticsearch provides more than simple full text search with features such as:
  • Geolocation: Find results based on their location.
  • Aggregation/Facets: aggregate your data as you query it: e.g. Find the countries that visit your site for a certain article or the tags on a given day. As aggregations are computed in real-time, the aggregations change when queries change; in other words, you get immediate feedback on your data set.
InfoQ: What are the design considerations when using Elasticsearch?
Data is king so focus on that. In order for Elasticsearch to work with the data the way you want to, it needs to understand your 'requirements'. While it can make best effort guesses about your data, your domain knowledge is invaluable in configuring your setup to support your requirements. It all boils down to data granularity or how the data is organized. To give you an example, take the logging case which seems to be quite common; it's better to break down the logs into time periods - so you end up with an index per month, or per week or even per day, etc. - instead of having them all under one big index. This separation makes it easy to handle spikes in growth and the removal or archiving of old data.
InfoQ: Can you discuss the design and architecture patterns supported by the Elasticsearch engine?
An index consists of multiple shards, each of which is a “mini” search engine in its own right; an index is really a virtual namespace which points at a number of shards.  Having multiple shards makes it easy to scale out by just adding more nodes.  Having replica shards - copies of each primary shard - provides high availability and increased read throughput.
Querying an index is a distributed operation, meaning Elasticsearch has to query one copy of each shard in the index and collate the results into a single result set.  Querying multiple indices is just an extension of the same process.  This approach allows for enormous flexibility when provisioning your data store.
With the domain specific knowledge that a user has about their application, it is easy to optimize queries to only hit relevant shards. This can make the same hardware support even greater load.
InfoQ: How does Elasticsearch support data scalability?
Elasticsearch has a distributed nature in order to be highly-available and scalable. From a top-level view, Elasticsearch stores documents (or data records) under indices (or collections). Each collection is broken down into multiple pieces called shards; the bigger an index is, the more shards you want to allocate. (Don't be afraid to overdo it, shards are cheap.) Shards are spread distributed equally across an Elasticsearch cluster depending on your settings and size, for two reasons:
  • For redundancy reasons: By default, Elasticsearch uses one copy for each shard so in case a node goes down, there's a backup ready to take its place.
  • For performance reasons: Each query is made on an index and is run in parallel across its shards. This workflow is the key component for improving performance; if things are slow, simply add more machines to the cluster, and Elasticsearch will automatically distribute the shards, and their queries, across the new nodes.
This approach gives organizations the freedom to scale both vertically (if a node is slow, upgrade the hardware) and horizontally (if a cluster is slow, add more nodes to increase its size).
InfoQ: What are the limitations or cautions of using this solution?
The main challenge that we see is with users moving from a SQL world to what you could call a contextual search one. For retrieving individual data entries (the typicalget), things are still the same - specify the id and get the data back; however, when it comes to data exploration there are different constructs to be used, from the type of analysis performed to what type of search or matching algorithm is used, e.g. fuzzyqueries.
InfoQ: Can you talk about the advantages of using Elasticsearch along with Hadoop technology?
Hadoop by design is a distributed, batch-oriented platform for processing large data sets. While it's a very powerful tool, its batch nature means it takes some time to produce the results. Further, the user needs to code all operations from scratch. Libraries like Hive and Pig help, but don't solve the problem completely; imagine reimplementing geolocation in Map/Reduce.
With Elasticsearch, you can leave search to the search engine and focus on the other parts, such as data transformation. The Elasticsearch-Hadoop project provides native integration with Hadoop so there is no gap for the user to bridge; we provide dedicated InputFormat and OutputFormat for vanilla Map/Reduce, Taps for reading and writing data in Cascading, and Storages for Pig and Hive so you can access Elasticsearch just as if the data were in HDFS.
Usually, data stores integrated into Hadoop tend to become a bottleneck due to the number of requests generated by the tasks running in the cluster for each job. The distributed nature of the Map/Reduce model fits really well on top of Elasticsearch because we can correlate the number of Map/Reduce tasks with the number of Elasticsearch shards for a particular query. So every time a query is run, the system dynamically generates a number of Hadoop splits proportional to the number of shards available so that the jobs are run in parallel. Your Hadoop cluster can scale alongside Elasticsearch, and vice-versa.
Furthermore, the integration enables cluster co-locations by exposing shard information to Hadoop. Job tasks are run on the same machines as the Elasticsearch shards themselves, eliminating network traffic and improving performance through data locality. We actually recommend running Elasticsearch and Hadoop clusters on the same machines for this very reason, especially as they complement each other in terms of resource usage (IO vs. CPU).
Last but not least, Elasticsearch provides near real-time responses (think milliseconds) that significantly improve a Hadoop job’s execution and the cost associated with it, especially when running on ‘rented resources' such as Amazon EMR.
InfoQ: Is there any integration between Spring Framework and Elasticsearch?
Yes, check out the Spring Data Elasticsearch project on Github. The project was started by our community members Biomed Central and we are happy to participate in the development process with them by using and improving it. The project provides the well-known Spring template as a high-level abstraction, as Repository support on top of Elasticsearch and extensive configuration through XML, JavaConfig and CDI. We are currently looking into aggregating existing integrations under the same umbrella, most notably David Pilato's spring-elasticsearch.

About the Interviewee

Costin Leau is an engineer at ElasticSearch, currently working with NoSQL and Big Data technologies. An open-source veteran, Costin led various Spring projects and authored an OSGi specification.

Related Posts Plugin for WordPress, Blogger...