Showing posts with label Pivotal. Show all posts
Showing posts with label Pivotal. Show all posts

Monday 20 October 2014

Pivotal HD and Greenplum Database Integration

Pivotal HD and Greenplum Database Integration

In this example, I will generate some dummy data with a SQL statement in GPDB and load it into PHD with an INSERT statement. I will then read the data back out of PHD with a SELECT statement.
One time steps Configuration
1. Download Pivotal HD that matches your version.
https://network.gopivotal.com/products/pivotal-hd
2. Copy the file to the Master server in the GPDB cluster. In my example, gpdbvm43 is single node VM with 2 segments.
scp PHD-2.0.1.0-148.tar.gz root@gpdbvm43:/root
3. ssh as gpadmin to GPDB 4.3 instance
vi .bashrc
[add these entries]
export JAVA_HOME=/usr/java/latest
export HADOOP_HOME=/usr/lib/gphd
Note: this needs to be repeated on every host in the cluster. You can use gpscp or gpssh to make these changes to all hosts too.
4. Make sure the database is running.
gpstart -a
5. Change the gp_hadoop_target_version to be compatible with Pivotal HD 1.0 and greater.
gpconfig -c gp_hadoop_target_version -v gphd-2.0
6. Add an entry to your /etc/hosts for all of the hosts in the Hadoop cluster. This needs to be done on all hosts in the GPDB cluster too. I’m using a single node VM of PHD so I just have one entry.
vi /etc/hosts 
[add every node in the PHD cluster]
192.168.239.203 pivhdsne.localdomain pivhdsne
Note: You can then use gpscp to copy the revised hosts file to the other hosts in the cluster.
7. Install the PHD client in the GPDB cluster.
su - 
tar --no-same-owner -zxvf PHD-1.1.0.0-76.tar.gz
cd PHD-2.0.1.0-148/utility/rpm
rpm -i *.rpm
cd ../../zookeeper/rpm
rpm -i *.rpm
cd ../../hadoop/rpm
yum install nc
rpm -i *.rpm
exit
Note: You can use gpscp to copy the tar.gz file to the other hosts in the cluster and then use gpssh to execute these commands. Be sure to source the greenplum_path.sh after connecting as root. “nc” may not be needed on your cluster but was required with my VM.
8. Now that you are gpadmin again, bounce the database.
gpstop -r
Pivotal HD Configuration
You likely already have this done but if you are using the single node VM of Pivotal HD, then you will need to edit your /etc/hosts file there so that Hadoop is accessible remotely.
1. ssh as root to the VM
[root@pivhdsne ~]# ifconfig
eth1      Link encap:Ethernet  HWaddr 00:0C:29:20:A3:8F  
          inet addr:192.168.239.203  Bcast:192.168.239.255  Mask:255.255.255.0
          inet6 addr: fe80::20c:29ff:fe20:a38f/64 Scope:Link
          UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
          RX packets:9168 errors:0 dropped:0 overruns:0 frame:0
          TX packets:1199 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000 
          RX bytes:653659 (638.3 KiB)  TX bytes:199320 (194.6 KiB)

lo        Link encap:Local Loopback  
          inet addr:127.0.0.1  Mask:255.0.0.0
          inet6 addr: ::1/128 Scope:Host
          UP LOOPBACK RUNNING  MTU:16436  Metric:1
          RX packets:3779367 errors:0 dropped:0 overruns:0 frame:0
          TX packets:3779367 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:0 
          RX bytes:4407394192 (4.1 GiB)  TX bytes:4407394192 (4.1 GiB)
Now vi /etc/hosts and change 127.0.0.1 to the IP address of the VM.
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.239.203 pivhdsne pivhdsne.localdomain
GPDB External Tables
1. Example of Writable External Table to PHD.
CREATE WRITABLE EXTERNAL TABLE ext_foo
(i int, bar text) LOCATION ('gphdfs://pivhdsne/foo_bar') FORMAT 'text' (delimiter '|' null 'null');
2. Insert some data into PHD from GPDB.
INSERT INTO ext_foo SELECT i, 'bar_' || i FROM generate_series(1, 100) AS i;
3. Create External Table to view the data in PHD.
CREATE EXTERNAL TABLE ext_get_foo
(i int, bar text) LOCATION ('gphdfs://pivhdsne/foo_bar') FORMAT 'text' (delimiter '|' null 'null');
4. Select the data.
postgres=# SELECT * FROM ext_get_foo ORDER BY i LIMIT 10;
 i  |  bar   
----+--------
  1 | bar_1
  2 | bar_2
  3 | bar_3
  4 | bar_4
  5 | bar_5
  6 | bar_6
  7 | bar_7
  8 | bar_8
  9 | bar_9
 10 | bar_10
(10 rows)
Parallel!
Here is how you can see that it was done in parallel. Notice there are two files in the /foo_bar directory that I specified in the External Writable Table above.
[pivhdsne:~]$ hdfs dfs -ls /foo_bar
Found 2 items
-rw-r--r--   3 gpadmin hadoop        490 2014-05-24 00:19 /foo_bar/0_1400790662-0000004541
-rw-r--r--   3 gpadmin hadoop        494 2014-05-24 00:19 /foo_bar/1_1400790662-0000004541
There are two files because my single node VM of GPDB has two Segments. Each Segment wrote its file to Hadoop at the same time. Completely parallel and scalable.
More proof!
[pivhdsne:~]$ hdfs dfs -cat /foo_bar/0_1400790662-0000004541 | more
1|bar_1
3|bar_3
5|bar_5
7|bar_7
9|bar_9
...

[pivhdsne:~]$ hdfs dfs -cat /foo_bar/1_1400790662-0000004541 | more
2|bar_2
4|bar_4
6|bar_6
8|bar_8
10|bar_10
...
Extra Credit!
Log into HAWQ on the PHD cluster and create an External Table to the same files!
CREATE EXTERNAL TABLE ext_get_foo
(i int, bar text) LOCATION 
('pxf://pivhdsne:50070/foo_bar?profile=HdfsTextSimple') FORMAT 'text' (delimiter '|' null 'null');

gpadmin=# SELECT * FROM ext_get_foo ORDER BY i limit 10;
 i  |  bar   
----+--------
  1 | bar_1
  2 | bar_2
  3 | bar_3
  4 | bar_4
  5 | bar_5
  6 | bar_6
  7 | bar_7
  8 | bar_8
  9 | bar_9
 10 | bar_10
(10 rows)

As you can see, there are lots of ways to move data between Greenplum database and Hadoop to satisfy a large variety of use cases to solve business problems.

XML Parsing with Pivotal

XML Parsing

There are many ways to handle XML files but in this case in which I had very large files, I needed a cluster of machines and Hadoop is pretty good at that. The processing can be done with Map Reduce or a tool like Pig which simplifies Map Reduce.
Solution 1
Steps
  • Load raw file to Hadoop
  • Transform XML to tab delimited file with Pig
  • Create External Table in HAWQ to read file data in Hadoop
Sample XML file.
<?xml version="1.0"?>
<catalog>
      <large-product>
         <name>foo1</name>
         <price>110</price>
      </large-product>
      <large-product>
         <name>foo2</name>
         <price>120</price>
      </large-product>
      <large-product>
         <name>foo3</name>
         <price>130</price>
      </large-product>
      <large-product>
         <name>foo4</name>
         <price>140</price>
      </large-product>
      <large-product>
         <name>foo5</name>
         <price>150</price>
      </large-product>
      <small-product>
         <name>bar1</name>
         <price>10</price>
      </small-product>
      <small-product>
         <name>bar2</name>
         <price>20</price>
      </small-product>
      <small-product>
         <name>bar3</name>
         <price>30</price>
      </small-product>
      <small-product>
         <name>bar4</name>
         <price>40</price>
      </small-product>
      <small-product>
         <name>bar5</name>
         <price>50</price>
      </small-product>
</catalog>
As you can see, I have two record sets of large products and small products but I just want the small products in a table.
Fist, put the raw XML data into Hadoop.
hdfs dfs -mkdir /demo4
hdfs dfs -put catalog.xml /demo4
Here is the Pig script.
REGISTER /usr/lib/gphd/pig/piggybank.jar;
A = LOAD '/demo4/catalog.xml' 
USING org.apache.pig.piggybank.storage.XMLLoader('small-product') 
AS (doc:chararray);

clean = foreach A GENERATE FLATTEN(REGEX_EXTRACT_ALL(doc,'<small-product>\\s*<name>(.*)</name>\\s*<price>(.*)</price>\\s*</small-product>'))
AS (name:chararray,price:int);
store clean into '/demo4/alt_small_data';
What Pig is doing for me is to first only get the small-product records. This only requires a single line in the script and is very useful. The next step is to use regular expressions to parse each tag. This is very painful to get right because Pig use Map Reduce to parse the data. This is powerful but relatively slow to iterate until you get it right. Even with a small file, each iteration took at least 30 seconds to execute and the full file took 22 minutes.
The last step is to create an External Table in HAWQ.
DROP EXTERNAL TABLE IF EXISTS ext_alt_demo4;
CREATE EXTERNAL TABLE ext_alt_demo4
(
  name text, price int
)
 LOCATION (
    'pxf://pivhdsne:50070/demo4/alt_small_data/part*?profile=HdfsTextSimple'
)
 FORMAT 'text' (delimiter E'\t');
And selecting the data in HAWQ.
SELECT * FROM ext_alt_demo4;
 name | price 
------+-------
 bar1 |    10
 bar2 |    20
 bar3 |    30
 bar4 |    40
 bar5 |    50
(5 rows)

Time: 127.334 ms
This was my first approach for XML parsing until I got frustrated with the many XML tags to create regular expressions for. The XML I had wasn’t as neat as my example so I had to re-run the Pig script over and over again for each slight modification to the parsing logic.
Solution 2
This the same basic process as Solution 1 but instead of parsing each record with regular expressions in Pig, I will create a single column and do the parsing with SQL in HAWQ.
Here is my Pig script.
REGISTER /usr/lib/gphd/pig/piggybank.jar;
A = LOAD '/demo4/catalog.xml' USING org.apache.pig.piggybank.storage.XMLLoader('small-product') AS (doc:chararray);
clean = foreach A generate REPLACE(REPLACE(doc, '\\u000D', ''), '\\u000A', '');
store clean into '/demo4/small_data';
So instead of using regular expressions, I’m replacing carriage return and newline characters from the XML so that each record is in one row. Then I store that back in Hadoop.
Here is the External Table in HAWQ.
CREATE EXTERNAL TABLE ext_demo4
(
xml_data text
)
LOCATION (
'pxf://pivhdsne:50070/demo4/small_data/part*?profile=HdfsTextSimple'
)
FORMAT 'TEXT' (delimiter E'\t');
I then created a simple SQL function to parse the data.
CREATE OR REPLACE FUNCTION fn_extract_xml_value(p_tag text, p_xml text) RETURNS TEXT AS
$$
SELECT SPLIT_PART(SUBSTRING($2 FROM '<' || $1 || '>(.*)</' || $1 || '>'), '<', 1)
$$
LANGUAGE SQL;
And my SQL statement that parses the data.
SELECT (fn_extract_xml_value('name', xml_data))::text AS name, (fn_extract_xml_value('price', xml_data))::int AS price FROM ext_demo4;                 
 name | price 
------+-------
 bar1 |    10
 bar2 |    20
 bar3 |    30
 bar4 |    40
 bar5 |    50
(5 rows)

Time: 94.887 ms
The benefit for me in this second approach is the huge performance increase in the iterative approach of getting the XML parsing correct. Instead of taking several minutes to validate my code in Pig, I could execute a SQL statement that takes less than 1 second to run. It took another quick second to modify the SQL function and then I would try again.
Summary
Hadoop is powerful and has become commodity software with many distributions available that are all pretty much the same. The difference in distributions is the software that is unique to each vendor. Some vendors rely on their management tools while Pivotal HD has HAWQ which is the most robust SQL engine for Hadoop. This example shows how you can leverage the built-in functionality of Hadoop plus HAWQ to be more productive compared to using any other Hadoop distribution.
Related Posts Plugin for WordPress, Blogger...