Wednesday, 30 July 2014

Apache Spark – a Fast Big Data Analytics Engine

Apache Spark – a Fast Big Data Analytics Engine

Introduction

There are different approaches in big data world to make Hadoop more suitable for ad-hoc, interactive queries and iterative data processing. As it is very well known, Hadoop MapReduce framework is primarily designed for batch processing and that makes it less suitable for ad-hoc data exploration, machine learning processes and the like. Big data vendors are trying to address this challenge by replacing MaReduce with alternatives. In case of SQL on Hadoop, there are various initiatives; Cloudera Impala, Pivotal HAWQ orHortonworks Stinger initiative that aims to improve Hive performance significantly.
Apache Spark is another increasingly popular alternative to replace MapReduce with a more performant execution engine but still use Hadoop HDFS as storage engine for large data sets.

Spark Architecture

From architecture perspective Apache Spark is based on two key concepts; Resilient Distributed Datasets (RDD) and directed acyclic graph (DAG) execution engine. With regards to datasets, Spark supports two types of RDDs: parallelized collections that are based on existing Scala collections and Hadoop datasets that are created from the files stored on HDFS. RDDs support two kinds of operations: transformations and actions. Transformations create new datasets from the input (e.g. map or filter operations are transformations), whereas actions return a value after executing calculations on the dataset (e.g. reduce or count operations are actions).
The DAG engine helps to eliminate the MapReduce multi-stage execution model and offers significant performance improvements.
Spark-Architecture
Figure 1: Spark Architecture

Installing Spark

Spark is written in Scala so before you install Spark, you need to install Scala. Scala binaries can be downloaded from http://www.scala-lang.org.
$ wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz
$ tar xvf scala-2.10.4.tgz 
$ ln -s scala-2.10.4 scala
$ vi .bashrc
export SCALA_HOME=/home/istvan/scala
export PATH=$SCALA_HOME/bin:$PATH
You can validate your Scala installation by running Scala REPL (Scala command line interpreter), below is an example how to execute the classic HelloWorld program from Scala:
$ scala
Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_32).
Type in expressions to have them evaluated.
Type :help for more information.

scala> object HelloWorld {
     |     def main(args: Array[String]) {
     |         println("Hello, world!")
     |     }
     | }
defined module HelloWorld

scala> HelloWorld.main(null)
Hello, world!
Then you can download Spark binaries from http://spark.apache.org/downloads.html. There are a couple of pre-compiled versions depending on your Hadoop distribution; we are going to use Spark binaries built for Cloudera CDH4 distribution.
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-0.9.0-incubating-bin-cdh4.tgz
$ tar xvf spark-0.9.0-incubating-bin-cdh4.tgz 
$ ln -s spark-0.9.0-incubating-bin-cdh4 spark

Using Spark shell

Now we are ready to run Spark shell which is a command line interpreter for Spark:
$ bin/spark-shell
14/03/31 15:54:51 INFO HttpServer: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/03/31 15:54:51 INFO HttpServer: Starting HTTP Server
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
      /_/

Using Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_32)
Type in expressions to have them evaluated.
Type :help for more information.
....
14/03/31 15:54:59 INFO HttpServer: Starting HTTP Server
...
Created spark context..
Spark context available as sc.

scala>
In our example we are going to process Apache weblogs that support the common logfile format having the following fields: hostname, timestamp, request, HTTP status code and number of bytes. The test file that we are using in this example is based on the public NASA weblog from 1995 August, see http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html. The file was cleaned and modified to support tab separated format.
Let us assume that you want to count how many hits the NASA web server got in August, 1995. In order to get the result, you can run the following commands in spark-shell:
Spark context available as sc.

scala> val accessLog = sc.textFile("hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95")
14/03/31 16:18:16 INFO MemoryStore: ensureFreeSpace(82970) called with curMem=0, maxMem=311387750
14/03/31 16:18:16 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 81.0 KB, free 296.9 MB)
accessLog: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12

scala> accessLog.count()
14/03/31 16:18:24 INFO FileInputFormat: Total input paths to process : 1
14/03/31 16:18:24 INFO SparkContext: Starting job: count at :15
14/03/31 16:18:24 INFO DAGScheduler: Got job 0 (count at :15) with 2 output partitions (allowLocal=false)
...
14/03/31 16:18:26 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:134217728+27316431
..
14/03/31 16:18:26 INFO SparkContext: Job finished: count at :15, took 2.297566932 s
res0: Long = 1569898

The first command creates an RDD from the NASA Apache access log stored on HDFS (hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95). The second command (an action executed on the accessLog RDD) will count the number of lines in the file. Note that the execution time is around 2 seconds.
If you are interested to know how many requests were initiated from one particular server (e.g. beta.xerox.com in our example), you need to execute a filter operation and then you run count action on the filtered dataset, see:
scala> val filteredLog = accessLog.filter(line => line.contains("beta.xerox.com"))

filteredLog: org.apache.spark.rdd.RDD[String] = FilteredRDD[2] at filter at :14

scala> filteredLog.count()
14/03/31 16:19:35 INFO SparkContext: Starting job: count at :17
14/03/31 16:19:35 INFO DAGScheduler: Got job 1 (count at :17) with 2 output partitions (allowLocal=false)
...
14/03/31 16:19:35 INFO Executor: Running task ID 2
...
14/03/31 16:19:35 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:0+134217728
14/03/31 16:19:36 INFO Executor: Serialized size of result for 2 is 563
1...
14/03/31 16:19:36 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:134217728+27316431
...
14/03/31 16:19:37 INFO DAGScheduler: Stage 1 (count at :17) finished in 1.542 s
14/03/31 16:19:37 INFO SparkContext: Job finished: count at :17, took 1.549706892 s
res1: Long = 2318

You can also execute more complex logic and define your own functions, thanks to Scala language capabilities. Let us assume that we need to calculate the total number of bytes generated by the given NASA web server in August, 1995. The number of bytes is the last (5th) field in the lines in the weblog file and unfortunately, there are cases when the field is ‘-‘, not an integer value as it would be expected. The standard toInt Scala String function throws an exception if you want to convert a non-numeric value into Integer. Thus we need to be able to identify whether a given string is a number or not and if not, we need to return 0. This requires a custom function (convertToInt) that will extend the standard String Scala class and will be made available for the String data type. Then we can use this custom function and the Spark standard RDD operations to calculate the total number of  bytes generated by the NASA webserver.
scala> def isNumeric(input: String): Boolean = input.forall(_.isDigit)
isNumeric: (input: String)Boolean


scala> class StringHelper(s:String) {
     |    def convertToInt():Int = if (isNumeric(s)) s.toInt else 0
     | }
defined class StringHelper


scala> implicit def stringWrapper(str: String) = new StringHelper(str)
warning: there were 1 feature warning(s); re-run with -feature for details
stringWrapper: (str: String)StringHelper


scala> "123".convertToInt
res2: Int = 123

scala> "-".convertToInt
res4: Int = 0

scala> accessLog.map(line=>line.split("\t")).map(line=>line(4).convertToInt).sum39 INFO SparkContext: Starting job: sum at :21
14/04/01 13:12:39 INFO DAGScheduler: Got job 2 (sum at :21) with 2 output partitions (allowLocal=false)
...
14/04/01 13:12:40 INFO HadoopRDD: Input split: hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95:0+134217728
... 
14/04/01 13:12:45 INFO SparkContext: Job finished: sum at :21, took 5.53186208 s
res4: Double = 2.6828341424E10
As you can see, the result is 26,8GB and the calculation was executed in 5.5 seconds.

Programming Spark

In addition to spark-shell that can be used to execute operations interactively, you can also write and build your code using Scala, Java or Python programming languages. Let us take an example how you can implement your weblog application in Scala.
In order to build your application, you need to follow the directory structure as shown below:
./
./simple.sbt
./src/main/scala/WeblogApp.scala
./project/build.properties
./sbt
You can copy sbt build tool from your Spark home directory (cp -af $SPARK_HOME/sbt/* ./sbt).
The simple.sbt build file should look something like this:
name := "Weblog Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.0.0-cdh4.4.0"

And the WeblogApp.scala code is as follows:
import org.apache.spark._

object WeblogApp {
    def main(args: Array[String]) {
        val file = "hdfs://localhost/user/cloudera/weblog/NASA_access_log_Aug95";
        val sc = new SparkContext("local", "WeblogApp",
                                  System.getenv("SPARK_HOME"), 
                                  SparkContext.jarOfClass(this.getClass))
        val accessLog = sc.textFile(file)

        println("Number of entries: " + accessLog.count())
    }
}
Then you can build and run the application using the Scala sbt build tool:
$ sbt/sbt package
$ sbt/sbt run
Launching sbt from sbt/sbt-launch-0.12.4.jar
[info] Loading project definition from /home/istvan/project
[info] Set current project to Weblog Project (in build file:/home/istvan/)
[info] Running WeblogApp 
...
14/04/01 14:48:58 INFO SparkContext: Starting job: count at WeblogApp.scala:12
14/04/01 14:48:58 INFO DAGScheduler: Got job 0 (count at WeblogApp.scala:12) with 2 output partitions (allowLocal=false)
...
14/04/01 14:49:01 INFO SparkContext: Job finished: count at WeblogApp.scala:12, took 2.67797083 s
Number of entries: 1569898
14/04/01 14:49:01 INFO ConnectionManager: Selector thread was interrupted!
[success] Total time: 9 s, completed 01-Apr-2014 14:49:01

Conclusion

Apache Spark has started gaining significant momentum and considered to be a promising alternative to support ad-hoc queries and iterative processing logic by replacing MapReduce. It offers interactive code execution using Python and Scala REPL but you can also write and compile your application in Scala and Java. There are various tools running on top of Spark such as Shark (SQL on Hadoop), MLib (machine learning), Spark Streaming and GraphX.It will be interesting to see how it evolves.

 

Apache Phoenix: An SQL Driver for HBase

Apache Phoenix: An SQL Driver for HBase

 

Introduction

HBase is one of the most popular NoSQL databases, it is available in all major Hadoop distributions and also part of AWS Elastic MapReduce as an additional application. Out of the box it has its own data model operations such as Get, Put, Scan and Delete and it does not offer SQL-like capabilities, as oppose to, for instance, Cassandra query language, CQL.
Apache Phoenix is a SQL layer on top of HBase to support the most common SQL-like operations such as CREATE TABLE, SELECT, UPSERT, DELETE, etc. Originally it was developed by Salesforce.com engineers for internal use and was open sourced. In 2013 it became an Apache incubator project.

Architecture

We have covered HBase in more detail in this article. Just a quick recap: HBase architecture is based on three key components: HBase Master server, HBase Region Servers and Zookeeper.
HBase-Architecture
The client needs to find the RegionServers in order to work with the data stored in HBase. In essence, regions are the basic elements for distributing tables across the cluster. In order to find the Region servers, the client first will have to talk to Zookeeper.
HBase-Lookup
The key elements in the HBase datamodel are tables, column families, columns and rowkeys. The tables are made of columns and rows. The individual elements at the column and row intersections (cells in HBase term) are version based on timestamp. The rows are identified by rowkeys which are sorted – these rowkeys can be considered as primary keys and all the data in the table can be accessed via them.
The columns are grouped into column families; at table creation time you do not have to specify all the columns, only the column families. Columns have a prefix derived from the column family and its own qualifier,a column name looks like this: ‘contents:html’.
As we have seen, HBase classic data model is not designed with SQL in mind. Under the hood it is a sorted multidimensional Map. That is where Phoenix comes to the rescue; it offers a SQL skin on HBase. Phoenix is implemented as a JDBC driver. From architecture perspective a Java client using JDBC can be configured to work with Phoenix Driver and can connect to HBase using SQL-like statements. We will demonstrate how to use SQuirreL client, a popular Java-based graphical SQL client together with Phoenix.

Getting Started with Phoenix

You can download Phoenix from Apache download site. Different Phoenix versions are compatible with different HBase versions, so please, read Phoenix documentation to ensure you have the correct setup. In our tests we used Phoenix 3.0.0 with HBase 0.94, the Hadoop distribution was Cloudera CDH4.4 with Hadoop v1.. The Phoenix package contains both Hadoop version 1 and version 2 drivers for the clients so we had to use the appropriate Hadoop-1 files, see the details later on when talking about SQuirreL client.
Once you unzipped the downloaded Phoenix package, you need to copy the relevant Phoenix jar files to the HBase region servers in order to ensure that the Phoenix client can communicate with them, otherwise you may get an error message saying that the client and server jars are not compatible.
$ cd ~/phoenix/phoenix-3.0.0-incubating/common
$ cp phoenix-3.0.0-incubating-client-minimal.jar  /usr/lib/hbase/lib
$ cp phoenix-core-3.0.0-incubating.jar /usr/lib/hbase/lib
After you copied the jar files to the region servers, we had to restart them.
Phoenix provides a command line tool called sqlline – it is a utility written in Python. Its functionality is similar to Oracle SQLPlus or MySQL command line tools; not too sophisticated but does the job for simply use cases.
Before you start using sqlline, you can create a sample database table, populate it and run some simple queries as follows:
$ cd ~/phoenix/phoenix-3.0.0.0-incubating/bin
$ ./psql.py localhost ../examples/web_stat.sql ../examples/web_stat.csv ../examples/web_stat_queries.sql
This will run a CREATE TABLE statement:
CREATE TABLE IF NOT EXISTS WEB_STAT (
     HOST CHAR(2) NOT NULL,
     DOMAIN VARCHAR NOT NULL,
     FEATURE VARCHAR NOT NULL,
     DATE DATE NOT NULL,
     USAGE.CORE BIGINT,
     USAGE.DB BIGINT,
     STATS.ACTIVE_VISITOR INTEGER
     CONSTRAINT PK PRIMARY KEY (HOST, DOMAIN, FEATURE, DATE)
);
Then load the data stored in the web_stat CSV file:
NA,Salesforce.com,Login,2013-01-01 01:01:01,35,42,10
EU,Salesforce.com,Reports,2013-01-02 12:02:01,25,11,2
EU,Salesforce.com,Reports,2013-01-02 14:32:01,125,131,42
NA,Apple.com,Login,2013-01-01 01:01:01,35,22,40
NA,Salesforce.com,Dashboard,2013-01-03 11:01:01,88,66,44
...
And the run a few sample queries on the table, e.g.:
-- Average CPU and DB usage by Domain
SELECT DOMAIN, AVG(CORE) Average_CPU_Usage, AVG(DB) Average_DB_Usage 
FROM WEB_STAT 
GROUP BY DOMAIN 
ORDER BY DOMAIN DESC;
Now you can connect to HBase using sqlline:
$ ./sqlline.py localhost
[cloudera@localhost bin]$ ./sqlline.py localhost
..
Connecting to jdbc:phoenix:localhost
Driver: org.apache.phoenix.jdbc.PhoenixDriver (version 3.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
..
Done
sqlline version 1.1.2
0: jdbc:phoenix:localhost> select count(*) from web_stat;
+------------+
|  COUNT(1)  |
+------------+
| 39         |
+------------+
1 row selected (0.112 seconds)
0: jdbc:phoenix:localhost> select host, sum(active_visitor) from web_stat group by host;
+------+---------------------------+
| HOST | SUM(STATS.ACTIVE_VISITOR) |
+------+---------------------------+
| EU   | 698                       |
| NA   | 1639                      |
+------+---------------------------+
2 rows selected (0.294 seconds)
0: jdbc:phoenix:localhost>

Using SQuirreL with Phoenix

If you prefer to use a graphical SQL client with Phoenix, you can download e.g. SQuirreL from here. After that the first step is to copy the appropriate Phoenix driver jar file to SQuirreL lib directory:
$ cd ~/phoenix
$ cp phoenix-3.0.0-incubating/hadoop-1/phoenix-3.0.0.-incubatibg-client.jar ~/squirrel/lib
Now you are ready to configure the JDBC driver in SQuirreL client, as shown in the picture below:
Squirrel-1
Then you can connect to Phoenix using the appropriate connect string (jdbc:phoenix:localhost in our test scenario):
Squirrel-2
Once connected, you can start executing your SQL queries:
Squirrel-3

Phoenix on Amazon Web Services – AWS Elastic MapReduce with Phoenix

You can also use Phoenix with AWS Elastic MapReduce. When you create a cluster, you need to specify Apach Hadoop version, then configure HBase as additional application and define the bootsrap action to load Phoenix onto your AWS EMR cluster. See the details below in the pictures:
AWS-EMR-3
AWS-EMR-5
Once the cluster is running, you can login to the master node using ssh and check your Phoenix configuration.
AWS-EMR-9

Conclusion

SQL is one of the most popular languages used by data scientists and it is likely to remain so. With the advent of Big Data and NoSQL databases the volume, variety and velocity of the data have significantly increased but still the demand for traditional, well-known languages to process them did not change too much. SQL on Hadoop solutions are gaining momentum. Apache Phoenix is interesting open source player to offer SQL layer on top of HBase.

 

Related Posts Plugin for WordPress, Blogger...