Showing posts with label HBase. Show all posts
Showing posts with label HBase. Show all posts

Friday 31 October 2014

HBase via Hive

One of the things I’m frequently asked about is how to use HBase from Apache Hive. Not just how to do it, but what works, how well it works, and how to make good use of it. I’ve done a bit of research in this area, so hopefully this will be useful to someone besides myself. This is a topic that we did not get to cover in HBase in Action, perhaps these notes will become the basis for the 2nd edition ;) These notes are applicable to Hive 0.11.x used in conjunction with HBase 0.94.x. They should be largely applicable to 0.12.x + 0.96.x, though I haven’t tested everything yet.

The hive project includes an optional library for interacting with HBase. This is where the bridge layer between the two systems is implemented. The primary interface you use when accessing HBase from Hive queries is called the HBaseStorageHandler. You can also interact with HBase tables directly via Input and Output formats, but the handler is simpler and works for most uses.

HBase tables from Hive

Use the HBaseStorageHandler to register HBase tables with the Hive metastore. You can optionally specify the HBase table as EXTERNAL, in which case Hive will not create to drop that table directly – you’ll have to use the HBase shell to do so.
CREATE [EXTERNAL] TABLE foo(...)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
TBLPROPERTIES ('hbase.table.name' = 'bar');
The above statement registers the HBase table named bar in the Hive metastore, accessible from Hive by the name foo.
Under the hood, HBaseStorageHandler is delegating interaction with the HBase table to HiveHBaseTableInputFormat and HiveHBaseTableOutputFormat. You can register your HBase table in Hive using those classes directly if you desire. The above statement is roughly equivalent to:
CREATE TABLE foo(...)
STORED AS
  INPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat'
TBLPROPERTIES ('hbase.table.name' = 'bar');
Also provided is the HiveHFileOutputFormat which means it should be possible to generate HFiles for bulkloading from Hive as well. In practice, I haven’t gotten this to work end-to-end (see HIVE-4627).

Schema mapping

Registering the table is only the first step. As part of that registration, you also need to specify a column mapping. This is how you link Hive column names to the HBase table’s rowkey and columns. Do so using the hbase.columns.mapping SerDe property.
CREATE TABLE foo(rowkey STRING, a STRING, b STRING)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,f:c1,f:c2')
TBLPROPERTIES ('hbase.table.name' = 'bar');
...
The values provided in the mapping property correspond one-for-one with column names of the hive table. HBase column names are fully qualified by column family, and you use the special token :key to represent the rowkey. The above example makes rows from the HBase table bar available via the Hive table foo. The foo column rowkey maps to the HBase’s table’s rowkey, a to c1 in the f column family, and b to c2, also in the f family.

You can also associate Hive’s MAP data structures to HBase column families. In this case, only the STRING Hive type is used. The other Hive type currently supported is BINARY. See the wiki page for more examples.

Interacting with data

With the column mappings defined, you can now access HBase data just like you would any other Hive data. Only simple query predicates are currently supported.
SELECT * FROM foo WHERE ...;
You can also populate and HBase table using Hive. This works with both INTO and OVERWRITE clauses.
FROM source_hive_table INSERT INTO TABLE my_hbase_table
SELECT source_hive_table.* WHERE ...;
Be advised that there is a regression in Hive 0.12.0 which breaks this feature, see HIVE-5515.

In practice

There’s still a little finesse required to get everything wired up properly at runtime. The HBase interaction module is completely optional, so you have to make sure it and it’s HBase dependencies are available on Hive’s classpath.
$ export HADOOP_CLASSPATH=...
$ hive -e "CREATE TABLE ... STORED BY 'org.apache...HBaseStorageHandler'"
The installation environment could do a better job of handling this for users, but for the time being you must manage it yourself. Ideally the hive bin script can detect the presence of HBase and automatically make the necessary CLASSPATH adjustments. This enhancement appears to be tracked in HIVE-2055. The last mile is provided by the distribution itself, ensuring the environment variables are set for hive. This functionality is provided by BIGTOP-955.
You also need to make sure the necessary jars are shipped out to the MapReduce jobs when you execute your Hive statements. Hive provides a mechanism for shipping additional job dependencies via the auxjars feature.
$ export HIVE_AUX_JARS_PATH=...
$ hive -e "SELECT * FROM ..."
I did discover a small bug in HDP-1.3 builds which masks user-specified values of HIVE_AUX_JARS_PATH. With administrative rights, this is easily fixed by correcting the line in hive-env.sh to respect an existing value. The work-around in user scripts is to use the SET statement to provide a value once you’ve launched the Hive CLI.
SET hive.aux.jars.path = ...
Hive should be able to detect which jars are necessary and add them itself. HBase provides the TableMapReduceUtils#addDependencyJars methods for this purpose. It appears that this is done in hive-0.12.0, at least according to HIVE-2379.

Future work

Much has been said about proper support for predicate pushdown (HIVE-1643, HIVE-2854, HIVE-3617, HIVE-3684) and data type awareness (HIVE-1245, HIVE-2599). These go hand-in-hand as predicate semantics are defined in terms of the types upon which they operate. More could be done to map Hive’s complex data types like Maps and Structs onto HBase column families as well (HIVE-3211). Support for HBase timestamps is a bit of a mess; they’re not made available to Hive applications with any level of granularity (HIVE-2828, HIVE-2306). The only interaction a user has is via storage handler setting for writing a custom timestamp with all operations.

From a performance perspective, there are things Hive can do today (ie, not dependent on data types) to take advantage of HBase. There’s also the possibility of an HBase-aware Hive to make use of HBase tables as intermediate storage location (HIVE-3565), facilitating map-side joins against dimension tables loaded into HBase. Hive could make use of HBase’s natural indexed structure (HIVE-3634, HIVE-3727), potentially saving huge scans.

Currently, the user doesn’t have (any?) control over the scans which are executed. Configuration on a per-job, or at least per-table basis should be enabled (HIVE-1233). That would enable an HBase-savy user to provide Hive with hints regarding how it should interact with HBase. Support for simple split sampling of HBase tables (HIVE-3399) could also be easily done because HBase manages table partitions already.

Other access channels

Everything discussed thus far has required Hive to interact with online HBase RegionServers. Applications may stand to gain significant throughput and enjoy greater flexibility by interacting directly with HBase data persisted to HDFS. This also has the benefit of preventing Hive workloads from interfering with online SLA-bound HBase applications (at least, until we see HBase improvements in QOS isolation between tasks, HBASE-4441).

As mentioned earlier, there is the HiveHFileOutputFormat. Resolving HIVE-4627 should make Hive a straight-forward way to generate HFiles for bulk loading. Once you’ve created the HFiles using Hive, there’s still the last step of running the LoadIncrementalHFiles utility to copy and register them in the regions. For this, the HiveStorageHandler interface will need some kind of hook to influence the query plan as it’s created, allowing it to append steps. Once in place, it should be possible to SET a runtime flag, switching an INSERT operation to use bulkload.

HBase recently introduced the table snapshot feature. This allows a user to create a persisted point-in-time view of a table, persisted to HDFS. HBase is able to restore a table from a snapshot to a previous state, and to create an entirely new table from an existing snapshot. Hive does not currently support reading from an HBase snapshot. For that matter, HBase doesn’t yet support MapReduce jobs over snapshots, though the feature is a work in progress (HBASE-8369).

Conclusions

The interface between HBase and Hive is young, but has nice potential. There’s a lot of low-hanging fruit that can be picked up to make things easier and faster. The most glaring issue barring real application development is the impedance mismatch between Hive’s typed, dense schema and HBase’s untyped, sparse schema. This is as much a cognitive problem as technical issue. Solutions here would allow a number of improvements to fall out, including much in the way of performance improvements. I’m hopeful that continuing work to add data types to HBase (HBASE-8089) can help bridge this gap.

Basic operations mostly work, at least in a rudimentary way. You can read data out of and write data back into HBase using Hive. Configuring the environment is an opaque and manual process, one which likely stymies novices from adopting the tools. There’s also the question of bulk operations – support for writing HFiles and reading HBase snapshots using Hive is entirely lacking at this point. And of course, there are bugs sprinkled throughout. The biggest recent improvement is the deprecation of HCatalog’s interface, removing the necessary upfront decision regarding which interface to use.

Hive provides a very usable SQL interface on top of HBase, one which integrates easily into many existing ETL workflows. That interface requires simplifying some of the BigTable semantics HBase provides, but the result will be to open up HBase to a much broader audience of users. The Hive interop compliments extremely well the experience provided by Phoenix. Hive has the benefit of not requiring the deployment complexities currently required by that system. Hopefully the common definition of types will allow a complimentary future.


Grab some data and register it in Hive

We’ll need some data to work with. For this purpose, grab some traffic stats from wikipedia. Once we have some data, copy it up to HDFS.
$ mkdir pagecounts ; cd pagecounts
$ for x in {0..9} ; do wget "http://dumps.wikimedia.org/other/pagecounts-raw/2008/2008-10/pagecounts-20081001-0${x}0000.gz" ; done
$ hadoop fs -copyFromLocal $(pwd) ./
For reference, this is what the data looks like.
$ zcat pagecounts-20081001-000000.gz | head -n5
aa.b Special:Statistics 1 837
aa Main_Page 4 41431
aa Special:ListUsers 1 5555
aa Special:Listusers 1 1052
aa Special:PrefixIndex/Comparison_of_Guaze%27s_Law_and_Coulomb%27s_Law 1 4332
As I understand it, each record is a count of page views of a specific page on Wikipedia. The first column is the language code, second is the page name, third is the number of page views, and fourth is the size of the page in bytes. Each file contains an hour’s worth of aggregated data. None of the above pages were particularly popular that hour.
Now that we have data and understand its raw schema, create a Hive table over it. To do that, we’ll use a DDL script that looks like this.
$ cat 00_pagecounts.ddl
-- define an external table over raw pagecounts data
CREATE TABLE IF NOT EXISTS pagecounts (projectcode STRING, pagename STRING, pageviews STRING, bytes STRING)
ROW FORMAT
  DELIMITED FIELDS TERMINATED BY ' '
  LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/user/ndimiduk/pagecounts';
Run the script to register our dataset with Hive.
$ hive -f 00_pagecounts.ddl
OK
Time taken: 2.268 seconds
Verify that the schema mapping works by calculating a simple statistic over the dataset.
$ hive -e "SELECT count(*) FROM pagecounts;"
Total MapReduce jobs = 1
Launching Job 1 out of 1
...
OK
36668549
Time taken: 25.31 seconds, Fetched: 1 row(s)
Hive says the 10 files we downloaded contain just over 36.5mm records. Let’s just confirm things are working as expected by getting a second opinion. This isn’t that much data, so confirm on the command line.
$ zcat * | wc -l
36668549
The record counts match up – excellent.

Transform the schema for HBase

The next step is to transform the raw data into a schema that makes sense for HBase. In our case, we’ll create a schema that allows us to calculate aggregate summaries of pages according to their titles. To do this, we want all the data for a single page grouped together. We’ll manage that by creating a Hive view that represents our target HBase schema. Here’s the DDL.
$ cat 01_pgc.ddl
-- create a view, building a custom hbase rowkey
CREATE VIEW IF NOT EXISTS pgc (rowkey, pageviews, bytes) AS
SELECT concat_ws('/',
         projectcode,
         concat_ws('/',
           pagename,
           regexp_extract(INPUT__FILE__NAME, 'pagecounts-(\\d{8}-\\d{6})\\..*$', 1))),
       pageviews, bytes
FROM pagecounts;
The SELECT statement uses hive to build a compound rowkey for HBase. It concatenates the project code, page name, and date, joined by the '/' character. A handy trick: it uses a simple regex to extract the date from the source file names. Run it now.
$ hive -f 01_pgc.ddl
OK
Time taken: 2.712 seconds
This is just a view, so the SELECT statement won’t be evaluated until we query it for data. Registering it with hive doesn’t actually process any data. Again, make sure it works by querying Hive for a subset of the data.
$ hive -e "SELECT * FROM pgc WHERE rowkey LIKE 'en/q%' LIMIT 10;"
Total MapReduce jobs = 1
Launching Job 1 out of 1
...
OK
en/q:Special:Search/Blues/20081001-090000       1       1168
en/q:Special:Search/rock/20081001-090000        1       985
en/qadam_rasul/20081001-090000  1       1108
en/qarqay/20081001-090000       1       933
en/qemu/20081001-090000 1       1144
en/qian_lin/20081001-090000     1       918
en/qiang_(spear)/20081001-090000        1       973
en/qin_dynasty/20081001-090000  1       1120
en/qinghe_special_steel_corporation_disaster/20081001-090000    1       963
en/qmail/20081001-090000        1       1146
Time taken: 40.382 seconds, Fetched: 10 row(s)

Register the HBase table

Now that we have a dataset in Hive, it’s time to introduce HBase. The first step is to register our HBase table in Hive so that we can interact with it using Hive queries. That means another DDL statement. Here’s what it looks like.
$ cat 02_pagecounts_hbase.ddl
-- create a table in hbase to host the view
CREATE TABLE IF NOT EXISTS pagecounts_hbase (rowkey STRING, pageviews STRING, bytes STRING)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,f:c1,f:c2')
TBLPROPERTIES ('hbase.table.name' = 'pagecounts');
This statement will tell Hive to go create an HBase table named pagecounts with the single column family f. It registers that HBase table in the Hive metastore by the name pagecounts_hbase with 3 columns: rowkey, pageviews, and bytes. The SerDe property hbase.columns.mapping makes the association from Hive column to HBase column. It says the Hive column rowkey is mapped to the HBase table’s rowkey, the Hive column pageviews to the HBase column f:c1, and bytes to the HBase column f:c2. To keep the example simple, we have Hive treat all these columns as the STRING type.

In order to use the HBase library, we need to make the HBase jars and configuration available to the local Hive process (at least until HIVE-5518 is resolved). Do that by specifying a value for the HADOOP_CLASSPATH environment variable before executing the statement.
$ export HADOOP_CLASSPATH=/etc/hbase/conf:/usr/lib/hbase/hbase-0.94.6.1.3.2.0-111-security.jar:/usr/lib/zookeeper/zookeeper.jar
$ hive -f 02_pagecounts_hbase.ddl
OK
Time taken: 4.399 seconds

Populate the HBase table

Now it’s time to write data to HBase. This is done using a regular Hive INSERT statement, sourcing data from the view with SELECT. There’s one more bit of administration we need to take care of though. This INSERT statement will run a mapreduce job that writes data to HBase. That means we need to tell Hive to ship the HBase jars and dependencies with the job.
Note that this is a separate step from the classpath modification we did previously. Normally you can do this with an export statement from the shell, the same way we specified the HADOOP_CLASSPATH. However there’s a bug in HDP-1.3 that requires me to use Hive’s SET statement in the script instead.
$ cat 03_populate_hbase.hql
-- ensure hbase dependency jars are shipped with the MR job
-- Should export HIVE_AUX_JARS_PATH but this is broken in HDP-1.3.x
SET hive.aux.jars.path = file:///etc/hbase/conf/hbase-site.xml,file:///usr/lib/hive/lib/hive-hbase-handler-0.11.0.1.3.2.0-111.jar,file:///usr/lib/hbase/hbase-0.94.6.1.3.2.0-111-security.jar,file:///usr/lib/zookeeper/zookeeper-3.4.5.1.3.2.0-111.jar;

-- populate our hbase table
FROM pgc INSERT INTO TABLE pagecounts_hbase SELECT pgc.* WHERE rowkey LIKE 'en/q%' LIMIT 10;
Note there’s a big ugly bug in Hive 0.12.0 which means this doesn’t work with that version. Never fear though, we have a patch in progress. Follow along at HIVE-5515.
If you choose to use a different method for setting Hive’s auxpath, be advised that it’s a tricky process – depending on how you specify it (HIVE_AUX_JARS_PATH, --auxpath), Hive will interpret the argument differently. HIVE-2349 seeks to remedy this unfortunate state of affairs.
$ hive -f 03_populate_hbase.hql
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
...
OK
Time taken: 40.296 seconds
Be advised also that this step is currently broken on secured HBase deployments. Follow along HIVE-5523 if that’s of interest to you.

Query data from HBase-land

40 seconds later, you now have data in HBase. Let’s have a look using the HBase shell.
$ echo "scan 'pagecounts'" | hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.94.6.1.3.2.0-111, r410a7a1c151ca953553eae68aa84e2a9f0d6e4ca, Mon Aug 19 19:00:12 PDT 2013

scan 'pagecounts'
ROW                                                 COLUMN+CELL
 en/q:Pan%27s_Labyrinth/20081001-080000             column=f:c1, timestamp=1381534232485, value=1
 en/q:Pan%27s_Labyrinth/20081001-080000             column=f:c2, timestamp=1381534232485, value=1153
 en/q:Special:Search/Jazz/20081001-080000           column=f:c1, timestamp=1381534232485, value=1
 en/q:Special:Search/Jazz/20081001-080000           column=f:c2, timestamp=1381534232485, value=980
 en/q:Special:Search/peinture/20081001-080000       column=f:c1, timestamp=1381534232485, value=1
 en/q:Special:Search/peinture/20081001-080000       column=f:c2, timestamp=1381534232485, value=989
 en/q:Special:Search/rock/20081001-080000           column=f:c1, timestamp=1381534232485, value=1
 en/q:Special:Search/rock/20081001-080000           column=f:c2, timestamp=1381534232485, value=980
 en/qadi/20081001-080000                            column=f:c1, timestamp=1381534232485, value=1
 en/qadi/20081001-080000                            column=f:c2, timestamp=1381534232485, value=1112
 en/qalawun%20complex/20081001-080000               column=f:c1, timestamp=1381534232485, value=1
 en/qalawun%20complex/20081001-080000               column=f:c2, timestamp=1381534232485, value=942
 en/qalawun/20081001-080000                         column=f:c1, timestamp=1381534232485, value=1
 en/qalawun/20081001-080000                         column=f:c2, timestamp=1381534232485, value=929
 en/qari'/20081001-080000                           column=f:c1, timestamp=1381534232485, value=1
 en/qari'/20081001-080000                           column=f:c2, timestamp=1381534232485, value=929
 en/qasvin/20081001-080000                          column=f:c1, timestamp=1381534232485, value=1
 en/qasvin/20081001-080000                          column=f:c2, timestamp=1381534232485, value=921
 en/qemu/20081001-080000                            column=f:c1, timestamp=1381534232485, value=1
 en/qemu/20081001-080000                            column=f:c2, timestamp=1381534232485, value=1157
10 row(s) in 0.4960 seconds
Here we have 10 rows with two columns each containing the data loaded using Hive. It’s now accessible in your online world using HBase. For example, perhaps you receive an updated data file and have a corrected value for one of the stats. You can update the record in HBase with a regular PUT command.

Verify data from from Hive

The HBase table remains available to you Hive world; Hive’s HBaseStorageHandler works both ways, after all.
Note that this command expects that the HADOOP_CLASSPATH is still set and HIVE_AUX_JARS_PATH as well if your query is complex.
$ hive -e "SELECT * from pagecounts_hbase;"
OK
en/q:Pan%27s_Labyrinth/20081001-080000  1       1153
en/q:Special:Search/Jazz/20081001-080000        1       980
en/q:Special:Search/peinture/20081001-080000    1       989
en/q:Special:Search/rock/20081001-080000        1       980
en/qadi/20081001-080000 1       1112
en/qalawun%20complex/20081001-080000    1       942
en/qalawun/20081001-080000      1       929
en/qari'/20081001-080000        1       929
en/qasvin/20081001-080000       1       921
en/qemu/20081001-080000 1       1157
Time taken: 2.554 seconds, Fetched: 10 row(s)

Continue using Hive for analysis

Since the HBase table is accessible from Hive, you can continue to use Hive for your ETL processing with mapreduce. Keep in mind that the auxpath considerations apply here too, so I’ve scripted out the query instead of just running it directly at the command line.
$ cat 04_query_hbase.hql
-- ensure hbase dependency jars are shipped with the MR job
-- Should export HIVE_AUX_JARS_PATH but this is broken in HDP-1.3.x
SET hive.aux.jars.path = file:///etc/hbase/conf/hbase-site.xml,file:///usr/lib/hive/lib/hive-hbase-handler-0.11.0.1.3.2.0-111.jar,file:///usr/lib/hbase/hbase-0.94.6.1.3.2.0-111-security.jar,file:///usr/lib/zookeeper/zookeeper-3.4.5.1.3.2.0-111.jar;

-- query hive data
SELECT count(*) from pagecounts_hbase;
Run it the same way we did the others.
$ hive -f 04_query_hbase.hql
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
...
OK
10
Time taken: 19.473 seconds, Fetched: 1 row(s)
There you have it: a hands-on, end to end demonstration of interacting with HBase from Hive. You can learn more about the nitty-gritty details in Enis’s deck on the topic, or see the presentation he and Ashutosh gave at HBaseCon. If you’re inclined to make the intersection of these technologies work better (faster, stronger), I encourage you to pick up any of the JIRA issues mentioned in this post or the previous.

Happy hacking!

Facebook's New Realtime Analytics System: HBase to Process 20 Billion Events Per Day

Facebook did it again. They've built another system capable of doing something useful with ginormous streams of realtime data. Last time we saw Facebook release their New Real-Time Messaging System: HBase To Store 135+ Billion Messages A Month. This time it's a realtime analytics system handling over 20 billion events per day (200,000 events per second) with a lag of less than 30 seconds
Alex Himel, Engineering Manager at Facebook, explains what they've built (video) and the scale required:
Social plugins have become an important and growing source of traffic for millions of websites over the past year. We released a new version of Insights for Websites last week to give site owners better analytics on how people interact with their content and to help them optimize their websites in real time. To accomplish this, we had to engineer a system that could process over 20 billion events per day (200,000 events per second) with a lag of less than 30 seconds.
Alex does an excellent job with the presentation. Highly recommended. But let's take a little deeper look at what's going on...
The need for such a high powered analytics system is driven by Facebook's brilliant plan for world wide web domination via the viral propagation of social plugins, all tying the non-Facebook web back into Facebook and the Facebook web back into the non-Facebook web. Basically anything that people can do is captured and fed back through Facebook and anything done on Facebook can be displayed on your website, building closer relations between the two.
Facebook's Social Plugins are Roman Empire Management 101. You don't have to conquer everyone to build an empire. You just have control everyone with the threat they could be conquered while making them realize, oh by the way, there's lots of money to be made being friendly with Rome. This strategy worked for quite a while as I recall.
You've no doubt seen Social Plugins on websites out the wild. A social plugin lets you see what your friends have liked, commented on or shared on sites across the web. The idea is putting social plugins on a site makes the content more engaging. Your friends can see what you are liking and in turn websites can see what everyone is liking. Content that is engaging gives you more clicks, more likes, and more comments. For a business or brand, or even an individual, the more engaging the content is, the more people see it, the more it pops up in news feeds, the more it drives traffic to a site.
The formerly lone-wolf web, where content hunters stalked web sites silently and singly, has been turned into a charming little village, where everyone knows your name. That's the power of social.
Posts here on HighScalability, for example, now have Like buttons. TechCrunch has famously moved to using Facebook's commenting system. Immediately the debate centered on the quality of the comment system itself, but that was hardly the point, the point was to plunge TechCrunch more deeply into Facebook's ecosystem of 500+ million users. Other plugins include: Recommendations, Activity Feed, Login, Registration, Facepile, and Live Stream.
All that data doesn't mean much unless you can make sense of it and also prove to content providers that social plugins actually do make their sites more engaging. That's where Facebook's Insights System comes in. It's an analytics system giving you access to all that juicy data being collected. It offers stats like Like button analytics, Comments box analytics, Popular pages, Demographics, and Organic sharing.
Imagine millions of websites and billions of pages and millions of people continually streaming data in via these social plugins. How do you make sense of all that data in real-time? It's a challenging problem.

Value Proposition

With an Insights System content producers can see what people like, which will enable content producers to generate more of what people like, which raises the content quality of the web, which gives users a better Facebook experience.

System Goals

  • Give people realtime counters, in a very reliable way, across a bunch of different metrics, and account for data skew.
  • Provide anonymous data. You can not figure out who the people are. 
  • Show why plugins are valuable. What value is your business deriving from it?
  • Make the data more actionable. Help users take action to make their content more valuable.
    • New UI metaphors. Use the idea of a funnel. 
    • How many people see a plugin, how many people take action on it, and how many are converted to traffic back on your site.  
  • Make the data more timely. 
    • They went realtime. Went from a 48-hour turn around to 30 seconds.
    • Multiple points of failure were removed to make this goal. 

Challenges

  • Lots of Event Types
    • Tracking 100+ metrics.
    • Plugin impressions.
    • Likes
    • News Feed Impressions
    • News Feed Clicks
    • Demographics
  • Massive Amounts of Data
    • 20 billion events per day (200,000 events per second)
  • Data Skew - Uneven Distribution of Keys
    • Likes follow something like a power law distribution. The long tail gets very few likes, but some resources get huge numbers of likes.
    • This brings up issues of hot regions, hot keys, and lock contention.

 Implemented a Bunch of Different Prototypes

  • MySQL DB Counters
    • Have a row with a key and a counter.
    • Results in lots of database activity.
    • Stats are kept at a day bucket granularity. Every day at midnight the stats would roll over. 
      • When the roll over period is reached this resulted in a lot of writes to the database, which caused a lot of lock contention.
      • Tried to spread the work by taking into account time zones. 
      • Tried to shard things differently.
    • The high write rate led to lock contention, it was easy to overload the databases, had to constantly monitor the databases, and had to rethink their sharding strategy.
    • Solution not well tailored to the problem.
  • In-Memory Counters
    • If you are worried about bottlenecks in IO then throw it all in-memory.
    • No scale issues. Counters are stored in memory so writes are fast and the counters are easy to shard.
    • Felt in-memory counters, for reasons not explained, weren't as accurate as other approaches. Even a 1% failure rate would be unacceptable. Analytics drive money so the counters have to be highly accurate. 
    • They didn't implement this system. It was a thought experiment and the accuracy issue caused them to move on.
  • MapReduce
    • Used Hadoop/Hive for previous solution. 
    • Flexible. Easy to get running. Can handle IO, both massive writes and reads. Don't have to know how they will query ahead of time. The data can be stored and then queried.
    • Not realtime. Many dependencies. Lots of points of failure. Complicated system. Not dependable enough to hit realtime goals.
  • Cassandra
    • HBase seemed a better solution based on availability and the write rate.
    • Write rate was the huge bottleneck being solved.

The Winner: HBase + Scribe + Ptail + Puma

  • At a high level:
    • HBase stores data across distributed machines.
    • Use a tailing architecture, new events are stored in log files, and the logs are tailed.
    • A system rolls the events up and writes them into storage.
    • A UI pulls the data out and displays it to users.
  • Data Flow
    • User clicks Like on a web page.
    • Fires AJAX request to Facebook.
    • Request is written to a log file using Scribe. 
      • Scribe handles issues like file roll over.
      • Scribe is built on the same HTFS file store Hadoop is built on.
      • Write extremely lean log lines. The more compact the log lines the more can be stored in memory.
    • Ptail
      • Data is read from the log files using Ptail. Ptail is an internal tool built to aggregate data from multiple Scribe stores. It tails the log files and pulls data out.
      • Ptail data is separated out into three streams so they can eventually be sent to their own clusters in different datacenters.
        • Plugin impression
        • News feed impressions
        • Actions (plugin + news feed)
    • Puma
      • Batch data to lessen the impact of hot keys. Even though HBase can handle a lot of writes per second they still want to batch data. A hot article will generate a lot of impressions and news feed impressions which will cause huge data skews which will cause IO issues. The more batching the better.
      • Batch for 1.5 seconds on average. Would like to batch longer but they have so many URLs that they run out of memory when creating a hashtable.
      • Wait for last flush to complete for starting new batch to avoid lock contention issues.
    • UI  Renders Data
      • Frontends are all written in PHP.
      • The backend is written in Java and Thrift is used as the messaging format so PHP programs can query Java services.
    • Caching solutions are used to make the web pages display more quickly.
      • Performance varies by the statistic. A counter can come back quickly. Find the top URL in a domain can take longer. Range from .5 to a few seconds. 
      • The more and longer data is cached the less realtime it is.
      • Set different caching TTLs in memcache.
    • MapReduce
      • The data is then sent to MapReduce servers so it can be queried via Hive.
      • This also serves as a backup plan as the data can be recovered from Hive.
      • Raw logs are removed after a period of time.
  • HBase is a distribute column store. 
    • Database interface to Hadoop. Facebook has people working internally on HBase. 
    • Unlike a relational database you don't create mappings between tables.
    • You don't create indexes. The only index you have a primary row key.
    • From the row key you can have millions of sparse columns of storage. It's very flexible. You don't have to specify the schema. You define column families to which you can add keys at anytime.
    • Key feature to scalability and reliability is the WAL, write ahead log, which is a log of the operations that are supposed to occur. 
      • Based on the key, data is sharded to a region server. 
      • Written to WAL first.
      • Data is put into memory. At some point in time or if enough data has been accumulated the data is flushed to disk.
      • If the machine goes down you can recreate the data from the WAL. So there's no permanent data loss.
      • Use a combination of the log and in-memory storage they can handle an extremely high rate of IO reliably. 
    • HBase handles failure detection and automatically routes across failures.
    • Currently HBase resharding is done manually.
      • Automatic hot spot detection and resharding is on the roadmap for HBase, but it's not there yet.
      • Every Tuesday someone looks at the keys and decides what changes to make in the sharding plan.
  • Schema 
    • Store on a per URL basis a bunch of counters.
    • A row key, which is the only lookup key, is the MD5 hash of the reverse domain
      • Selecting the proper key structure helps with scanning and sharding.
      • A problem they have is sharding data properly onto different machines. Using a MD5 hash makes it easier to say this range goes here and that range goes there. 
      • For URLs they do something similar, plus they add an ID on top of that. Every URL in Facebook is represented by a unique ID, which is used to help with sharding.
      • A reverse domain, com.facebook/ for example, is used so that the data is clustered together. HBase is really good at scanning clustered data, so if they store the data so it's clustered together they can efficiently calculate stats across domains. 
    • Think of every row a URL and every cell as a counter, you are able to set different TTLs (time to live) for each cell. So if keeping an hourly count there's no reason to keep that around for every URL forever, so they set a TTL of two weeks. Typically set TTLs on a per column family basis. 
  • Per server they can handle 10,000 writes per second. 
  • Checkpointing is used to prevent data loss when reading data from log files. 
    • Tailers save log stream check points  in HBase.
    • Replayed on startup so won't lose data.
  • Useful for detecting click fraud, but it doesn't have fraud detection built in.
  • Tailer Hot Spots
    • In a distributed system there's a chance one part of the system can be hotter than another.
    • One example are region servers that can be hot because more keys are being directed that way.
    • One tailer can be lag behind another too.
    • If one tailer is an hour behind and the others are up to date, what numbers do you display in the UI?
      • For example, impressions have a way higher volume than actions, so CTR rates were way higher in the last hour.
      • Solution is to figure out the least up to date tailer and use that when querying metrics.
  • Future Directions
    • Top Lists
      • Very hard to find the top URLs, the URLs with the most likes, for domains like YouTube which have millions of URLs shared very quickly. 
      • Need more creative solutions to keep an in-memory sorts and keep it up to date as data changes.
    • Distinct User Counts
      • How many people across a time window liked a URL. Easy to do in MapReduce, hard to do with a naive counter solution.
    • Generalize for Applications other than Social Plugins
    • Move to Multiple Datacenters
      • Single datacenter currently, but hope to move to multiple datacenters.
      • Current fallback plan is to use the MapReduce system.
      • The backup systems are tested each night. Queries against Hive and this new system are compared to see that they match. 
  • Project
    • Took about 5 months.
    • Two engineers first started working on the project. Then a 50% engineer was added.
    • Two UI people worked on the front-end.
    • Looks like about 14 people worked on the product in from engineering, design, PM, and operations.
When we look at the messaging system and this analytics system, we notice what the two systems have in common: large numbers, HBase, real-time. The challenge of dealing with huge write loads in a reliable and timely fashion is a common substrate to these problems. Facebook is focussing on the HBase, Hadoop, HDFS ecosystem and counting on the operational quirks to be ironed out later. Others choose Cassandra because they love it's scalability, multi-datacenter functionality, and ease of operational use, but it doesn't fit as cleanly into the overall analytics stack.
What does this mean for you? Even if you aren't Facebook this architecture is simple enough and composed of enough off the shelf tools that it could work for much small projects too.

FaceBook Underlying Technology of Messages

We're launching a new version of Messages today that combines chat, SMS, email, and Messages into a real-time conversation. The product team spent the last year building out a robust, scalable infrastructure. As we launch the product, we wanted to share some details about the technology.

The current Messages infrastructure handles over 350 million users sending over 15 billion person-to-person messages per month. Our chat service supports over 300 million users who send over 120 billion messages per month. By monitoring usage, two general data patterns emerged:
  1. A short set of temporal data that tends to be volatile
  2. An ever-growing set of data that rarely gets accessed
When we started investigating a replacement for the existing Messages infrastructure, we wanted to take an objective approach to storage for these two usage patterns. In 2008 we open-sourced Cassandra, an eventual-consistency key-value store that was already in production serving traffic for Inbox Search. Our Operations and Databases teams have extensive knowledge in managing and running MySQL, so switching off of either technology was a serious concern. We either had to move away from our investment in Cassandra or train our Operations teams to support a new, large system.

We spent a few weeks setting up a test framework to evaluate clusters of MySQL, Apache Cassandra, Apache HBase, and a couple of other systems. We ultimately chose HBase. MySQL proved to not handle the long tail of data well; as indexes and data sets grew large, performance suffered. We found Cassandra's eventual consistency model to be a difficult pattern to reconcile for our new Messages infrastructure.

HBase comes with very good scalability and performance for this workload and a simpler consistency model than Cassandra. While we’ve done a lot of work on HBase itself over the past year, when we started we also found it to be the most feature rich in terms of our requirements (auto load balancing and failover, compression support, multiple shards per server, etc.). HDFS, the underlying filesystem used by HBase, provides several nice features such as replication, end-to-end checksums, and automatic rebalancing. Additionally, our technical teams already had a lot of development and operational expertise in HDFS from data processing with Hadoop. Since we started working on HBase, we've been focused on committing our changes back to HBase itself and working closely with the community. The open source release of HBase is what we’re running today.

Since Messages accepts data from many sources such as email and SMS, we decided to write an application server from scratch instead of using our generic Web infrastructure to handle all decision making for a user's messages. It interfaces with a large number of other services: we store attachments in Haystack, wrote a user discovery service on top of Apache ZooKeeper, and talk to other infrastructure services for email account verification, friend relationships, privacy decisions, and delivery decisions (for example, should a message be sent over chat or SMS). We spent a lot of time making sure each of these services are reliable, robust, and performant enough to handle a real-time messaging system.

The new Messages will launch over 20 new infrastructure services to ensure you have a great product experience. We hope you enjoy using it.

Kannan is a software engineer at Facebook.

Monday 20 October 2014

Let's Discuss Hbase - Row key Design

Let's Discuss Hbase - Row key Design

HBase: Row-Key Design ---- --- Demonstrate: Design Solutions and Pros/Cons ---


Row-Key Design
Try to keep row keys short because they are stored with each cell in an HBase table, thus noticeably reducing 
row-key size results of data needed for storing HBase data. This advice also applies to column family names.

Common problems of choosing between sequential row keys and randomly distributed row keys:

Some mixed-design approaches allow fast range scans while distributing data among all clusters when 
writing sequential (by nature) data.

Design Solution: Using sequential row keys (e.g. time-series data with row key built based on timestamp)
Pros: Makes it possible to perform fast range scans with help of setting start/stop keys on Scanner
Cons: Creates single regionserver, hotspotting problems upon writing data (as row keys go in sequence, 
all records end up written into a single region at a time)

Design Solution: Using randomly distributed row keys(e.g. UUIDs)
Pros: Aims for fastest writing performance by distributing new records over random regions
Cons: Does not conduct fast range scans against written data


{{{Column Families}}}Currently, HBase does not do well with anything above two or three column families per table. With that said, 
keep the number of column families in your schema low. Try to make do with one column family in your schemata 
if you can. Only introduce a second and third column family in the case where data access is usually 
column-scoped; i.e. you usually query no more than a single column family at one time.

You can also set TTL (in seconds) for a column family. HBase will automatically delete rows once reaching 
the expiration time.

{{{Versions}}}The maximum number of row versions that can be stored is configured per column family (the default is 3). 
This is an important parameter because HBase does not overwrite row values, but rather stores different values
per row by time (and qualifier). Setting the number of maximum versions to an exceedingly high level 
(e.g., hundreds or more) is not a good idea because that will greatly increase StoreFile size.

The minimum number of row versions to keep can also be configured per column family (the default is 0, meaning
 
the feature is disabled). This parameter is used together with TTL and maximum row versions parameters to allow 
configurations such as “keep the last T minutes worth of data of at least M versions, and at most N versions.” 
This parameter should only be set when TTL is enabled for a column family and must be less than the number of row versions.

{{{Data Types}}}
HBase supports a “bytes-in/bytes-out” interface via Put and Result, so anything that can be converted to an 
array of bytes can be stored as a value. Input can be strings, numbers, complex objects, or even images, as long as they can be rendered as bytes.

One supported data type that deserves special mention is the “counters” type. 
This type enables atomic increments of numbers.......

Some Case study for designing the row key-

Friday 17 October 2014

Speeding up a Pig+HBase MapReduce job by a factor of 15

The other day I ran a Pig script. Nothing fancy; I loaded some data into HBase and then ran a second Pig job to do some aggregations. I knew the data loading would take some time as it was multiple GB of data, but I expected the second aggregation job to run much faster. It ran for over 15 hours and was not done at that time. This was too long in my mind and I terminated it. I was using Amazon Elastic Map Reduce as my Hadoop environment, so I could have add more resources, but I wasn’t sure if that would do me any good. I took a look at the performance break downs, and within minutes I found the problem. Not only, that I realized that even an expert would have had a hard time identifying the problem. What’s more, the goto fix for more performance in Hadoop, which is adding more hardware, would not have helped at all!

Pig vs. Hive

Pig is one of the two predominant ways to access, transform and query data in Hadoop. Whereas Hive uses a SQL-like approach and it appeals to database people and analysts, Pig is a script language. Its appeal is in its procedural approach and its ability to transform data in a pipeline fashion, this appeals to ETL experts (Extract Transform Load). Pig is typically used to load data into Hadoop and perform complex transformations and aggregations that would be hard to do in a single Hive query. Both systems generate one or more MapReduce jobs and therein lies their power and complexity.

My MapReduce Job

For demonstration purposes I simply used a kmer index job to reproduce the issue, if you are interested in running it yourself get it from: https://github.com/lila/emr-hbase-sample
The job consists of two Amazon EMR steps in a single job flow. The first Pig script loads multiple gigabytes of genome sequence data into a single HBase table (I got some additional fasta sample genome files to do this compared to the original sample). The second pig job produces a size-based index. The respective Pig Latin looks like this:
A = LOAD 'hbase://kmer'
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
'sequence:*', '-loadKey true')
AS (id:bytearray, sequence:map[]);
B = foreach A generate id, SIZE(sequence);
C = group B by $1 PARALLEL $p;
D = foreach C generate group, COUNT(B);
I ran it on an empty HBase table and looked at the performance results.

The Loading Job

As said the first was a Pig job that loaded several gigabytes of data into a simple HBase table. When looking at it we can see a couple of things immediately:
The MapReduce Task is sending inserts to multiple HBase Region servers, thus not storing the data locally
The MapReduce Task is sending inserts to multiple HBase Region servers, thus not storing the data locally
First of all the HBase data is not stored locally. This is not much of a surprise. The Pig script is loading data from a file and gets distributed evenly across the Hadoop cluster based on that input data. That distribution is of course not the same as the resulting region distribution in HBase. Secondly we also notice that for the first 10 minutes only a single HBase region server is hit, then a second and only after 14 minutes do all 5 HBase region servers process data.
At the beginning of the job only a single HBase region Server consumes CPU while all others remain idle
At the beginning of the job only a single HBase region Server consumes CPU while all others remain idle
This is also well documented and has to do with HBase regions and splits (read this very good blogby Hortonworks if you want to know more).
And lastly we also see that this was a map-only job.
When looking at the break down in time we see that the kmerLoad jobs do not spend any time in reducing or shuffling but only in the mapping phase
When looking at the break down in time we see that the kmerLoad jobs do not spend any time in reducing or shuffling but only in the mapping phase
More specifically we spend nearly all time in HBase. As we are loading data, into HBase we would expect it to be the primary contributor.
This break down shows that all MapTasks are processing and spend nearly all their time sending data to hbase
This break down shows that all MapTasks are processing and spend nearly all their time sending data to HBase
Next I took a look at the HBase inserts. We can see how many items we processed and inserted into HBase:
This shows the inserts per region server and hbase table/column family. We also see the number of roundtrips compared to the number of rows inserted
This shows the inserts per region server and HBase table/column family. We also see the number of roundtrips compared to the number of rows inserted
A couple of things are interesting here. First of all we see that the number of inserts is not equally distributed across the region servers. That  is again due to the fact that the table was empty, and hbase started in a single region (see the previous screenshots and hortonworks blog).  We can also compare the number of inserted rows with the number of roundtrips, making sure that we leverage batching correctly.
When comparing this with the number of mapped records we see that we are doing roughly 27 inserts per input record and each output record corresponds to a single HBase row.
This shows Details about the MapReduce Job and several counters. In this case we see the mapped records.
This shows Details about the MapReduce Job and several counters. In this case we see the mapped records.
In summery we can say that the loading is already pretty optimal, the CPU load on the cluster is fine and we insert ~1700 rows into HBase per roundtrip.
This shows a single task attempt inserting rows into HBase, we can see that it inserts many rows in a single roundtrip
This shows a single task attempt inserting rows into HBase, we can see that it inserts many rows in a single roundtrip
Together with the fact that HBase really was the main contributor here means, that there was nothing much that I could do to speed things up, at least not with some serious expert help (suggestions on how to improve this beyond pre-spliting are welcome). So let’s move on the the index job.

The Index Job

The second job was creating a size-based index over all inserted data; hence it had to read all data in the table. This second step lasted for 15 hours and did not complete, because I terminated it at that time. By analyzing it I saw the root cause almost immediately.
The Database Overview shows a single Scan that makes a roundtrip to the HBase region server for every single row it retrieves
The Database Overview shows a single Scan that makes a roundtrip to the HBase region server for every single row it retrieves
Look at the roundtrips and the row counts processed. There is one more roundtrip than there are rows! That means the MapReduce job is calling out to the HBase region server for every single row, which is very inefficient.
The default scan caching on a Pig HBase job should be 100, but that was obviously not the case. The performance breakdown showed me that we were indeed spending all of our time waiting on HBase.
The HBase region Server contributes over 95% to our job execution time
The HBase region Server contributes over 95% to our job execution time
This flow also shows that we spend 95% of our time waiting on HBase
This flow also shows that we spend 95% of our time waiting on HBase
Because we are making one network roundtrip for every single row, the utilization of my cluster was miserable.
The CPU utilization in the hadoop cluster is bad, it only has a couple of spikes which seem to coincide with HBase region splits
The CPU utilization in the hadoop cluster is bad, it only has a couple of spikes which seem to coincide with HBase region splits
Therefore adding more hardware would not have helped much – it was not the lack of resources that made my indexing job slow.

Pig really doesn’t like HBase…

According to the documentation HBaseStorage has an option to set the Scanner Caching, which should lead to fewer roundtrips and more rows fetch in a single roundtrip. This is how it looks like in the PIG script:
A = LOAD 'hbase://kmer'
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
'sequence:*', '-loadKey true -caching 500')
AS (id:bytearray, sequence:map[]);
B = foreach A generate id, SIZE(sequence);
C = group B by $1 PARALLEL $p;
D = foreach C generate group, COUNT(B);
I ran a smaller sample to test this option. However when I looked at the result I saw that it was still reading one row at a time.
We see that the HBase scan is still doing a roundtrip for every single row it retrieves
We see that the HBase scan is still doing a roundtrip for every single row it retrieves
Now I was really curious, I added a couple of sensors to get more visibility and what I found was that HBaseStorage did pick up by caching configuration option as defined in the script, but the actual HBase Scanner was still being configured to use 1 as cache size!
The two selected rows show that while the Pig option (the first) retrieves the caching option defined in my pig script, the Scanner itself still gets Caching size 1 (second selected row)
The two selected rows show that while the Pig option (the first) retrieves the caching option defined in my pig script, the Scanner itself still gets Caching size 1 (second selected row)
I did not give up there though! Pig also has a way to set global MapReduce job options which is what I tried next. This can be done by adding the following to the Pig script
SET hbase.client.scanner.caching 1000;
It didn’t have any effect either. I could verify that the setting made it into job.xml submitted to Hadoop, by looking at the job tracker. I also verified that the Map Task itself was not aware of that setting at all.
This shows that the hbase.client.scanner.caching job option always returns 1, although the job.xml is set to 1000
This shows that the hbase.client.scanner.caching job option always returns 1, although the job.xml is set to 1000
Finally I launched a new Hadoop cluster via EMR and added the HBase option in the bootstrap script, making it a cluster wide configuration setting:
--bootstrap-action s3://us-east-1.elasticmapreduce/bootstrap-actions/configure-hbase --args -s,hbase.client.scanner.caching=500
I ran the job again, and wow was it fast. My sample subset went down from 9 minutes to 2! My overall map time went down from 7 minutes to 25 seconds!
This shows the same job executed twice, once with caching option not working and once with a cluster wide setting that did work. The red rectangle shows the dramatic effect this has on map time
This shows the same job executed twice, once with caching option not working and once with a cluster wide setting that did work. The red rectangle shows the dramatic effect this has on map time
Satisfied that the scanner caching option finally worked I ran my big job again. The improvement was massive. It went down from over 15 hours (where I terminated it!) to 75 minutes! That is over15 times faster on the same hardware!
Looking at the transaction flow we see that we still spend most of our time in HBase, but it went down from over 95% to 72%. The shift meant that a lot more processing was done on the MapReduce side itself. To put this into context this means that a lot more time was spent in the aggregation part of the pig script.
This transaction flow shows that we now spend only 72% of the job time waiting on HBase and much more time doing actual processing
This transaction flow shows that we now spend only 72% of the job time waiting on HBase and much more time doing actual processing
Thus we’ve turned this job from one mostly waiting on HBase and network to one that is actually leveraging the CPU. As a result the utilization of my cluster is much better now, albeit far from optimal.
The CPU load on the Cluster is much better now, compared to before.
The CPU load on the Cluster is much better now, compared to before.
There is still a lot of room for improvement, CPU utilization of my Hadoop nodes is not close to 100%, so we might be able to make this even faster, but that is work for another day. What’s important is, without the insight that our APM solution gives us I would have never know how to fix this!

Conclusion

While a Pig or Hadoop expert might have told me right away to set the caching option, even he wouldn’t have figured out easily why the option didn’t take effect. With APM I was able to verify the number of roundtrips and immediately saw that the option didn’t work; I didn’t need to spend days or hours to realize that. While I do not know why the option doesn’t work (any Hadoop developers here who can tell us?) I was able to remedy the situation and also verify that the site-wide option had the desired effect – with dramatic improvements!
When I ask customers how they deal with performance problems in MapReduce, they often tell me, beyond generic Hadoop tuning they don’t bother. They simply add more hardware, because they cannot have experts spending days or weeks browsing through logs or looking at every job they execute. Now in this case, adding more hardware would not have helped as the existing cluster was well underutilized. And by leveraging an APM solution I was able to figure out the root cause, that the documented fix to this was not working and come up with an alternate solution within an hour! I sped up my job by a factor of 15, without adding any more hardware to my cluster and without needing to have an expert go through log files!

Related Posts Plugin for WordPress, Blogger...