Showing posts with label Parquet. Show all posts
Showing posts with label Parquet. Show all posts

Tuesday 21 October 2014

Parquet – columnar storage for Hadoop

Parquet is a columnar storage format for Hadoop that uses the concept of repetition/definition levels borrowed from Google Dremel. It provides efficient encoding and compression schemes, the efficiency being improved due to application of aforementioned on a per-column basis (compression is better as column values would all be the same type, encoding is better as values within a column could often be the same and repeated). Here is a nice blog post from Julien describing Parquet internals.
Parquet can be used by any project in the Hadoop ecosystem, there are integrations provided for M/R, Pig, Hive, Cascading and Impala.
I am by no means an expert at this, and a lot of what I write here is based on my conversations with a couple of key contributors on the project (@J_ and @aniket486). Also, most of the content mentioned on this post is based on Pig+Parquet integration. We atSalesforce.com have started using Parquet for application logs processing with Pig and are encouraged with the preliminary performance results.
Writing a Parquet file
There is parquet.hadoop.ParquetWriter. You need to decide which ObjectModel you want to use. It could be Thrift, Avro, Pig or the example model. Here is a function for writing a file using the Pig model (TupleWriteSupport)
private void write(String pigSchemaString, String writePath) throws Exception {
     Schema pigSchema = Utils.getSchemaFromString(pigSchemaString);
     TupleWriteSupport writeSupport = new TupleWriteSupport(pigSchema);      
     FileSystem fs = FileSystem.get(new Configuration());
     Path path = new Path(writePath);
     if(fs.exists(path)) {
         fs.delete(path, true);
     }
     ParquetWriter<Tuple> writer = new ParquetWriter<Tuple>(path, writeSupport,       CompressionCodecName.UNCOMPRESSED,  ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, false, false);

     TupleFactory tf = TupleFactory.getInstance();
     for (int i = 0; i < NUM_RECORDS; i++) {
         Tuple t = tf.newTuple();
         for(int j=0; j<pigSchema.size(); j++) {
         t.append(i+j);
         }
         writer.write(t);

     }
     writer.close();
}
“pigSchemaString” is the schema for the parquet file. This could be any valid pig schema, such as “a:int, b:int, c:int”. Note that I insert integer values in the tuple and hence schema fields are defined to be int.
So what exactly happened during the write? I use TupleWriteSupport which is aWriteSupport implementation that helps us write parquet files compatible with Pig. I then use ParquetWriter passing in a few arguments:
  1. path – file path to write to
  2. writeSupport – TupleWriteSupport in this case
  3. compressionCodecName – could be UNCOMPRESSED, GZIP, SNAPPY, LZO
  4. blockSize – block size which is 128M by default. Total size used by a block
  5. pageSize –  from the parquet docs: “pages should be considered indivisible so smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers). Note: for sequential scans, it is not expected to read a page at a time; this is not the IO chunk.  We recommend 8KB for page sizes.” Default page size is 1MB
  6. enableDictionary – turn on/off dictionary encoding
At the end, I create tuples with a few elements and write to the parquet file.
Reading a Parquet file
There is a bug due to which trying to read Parquet files written using the Pig model (TupleWriteSupport) directly with ParquetReader fails. Seehttps://github.com/Parquet/parquet-mr/issues/195
However, you can use Pig to read the file.
A = load ‘parquet_file’ USING parquet.pig.ParquetLoader();

describe A;

{a: int, b: int, c: int}

B = foreach A generate a;

dump B;
Initial Findings
I ran the PerfTest and noticed it takes longer to read 1 column and takes progressively less for more columns.
Parquet file read took : 1790(ms) for 1 columns
Parquet file read took : 565(ms) for 2 columns
Parquet file read took : 560(ms) for 3 columns
Parquet file read took : 544(ms) for 4 columns
Parquet file read took : 554(ms) for 5 columns
Parquet file read took : 550(ms) for 10 columns
Parquet file read took : 545(ms) for 20 columns
Parquet file read took : 563(ms) for 30 columns
Parquet file read took : 1653(ms) for 40 columns
Parquet file read took : 1049(ms) for 50 columns
That’s the JVM warmup phase!
The JIT compiler will inline methods based on how often they are called. So it waits before doing so to see what gets called often. Julien suggested I run it twice in a row (in the same process) and compare the times. This is generic and nothing in particular to Parquet, but I wanted to highlight in case you happen to run into similar perf numbers.
Voila! 2nd iteration did provide better results.
Parquet file read took : 1809(ms) for 1 columns, iteration 0
Parquet file read took : 563(ms) for 2 columns, iteration 0
Parquet file read took : 562(ms) for 3 columns, iteration 0
Parquet file read took : 548(ms) for 4 columns, iteration 0
Parquet file read took : 548(ms) for 5 columns, iteration 0
Parquet file read took : 554(ms) for 10 columns, iteration 0
Parquet file read took : 548(ms) for 20 columns, iteration 0
Parquet file read took : 550(ms) for 30 columns, iteration 0
Parquet file read took : 1603(ms) for 40 columns, iteration 0
Parquet file read took : 1054(ms) for 50 columns, iteration 0
Parquet file read took : 536(ms) for 1 columns, iteration 1
Parquet file read took : 531(ms) for 2 columns, iteration 1
Parquet file read took : 527(ms) for 3 columns, iteration 1
Parquet file read took : 532(ms) for 4 columns, iteration 1
Parquet file read took : 527(ms) for 5 columns, iteration 1
Parquet file read took : 533(ms) for 10 columns, iteration 1
Parquet file read took : 537(ms) for 20 columns, iteration 1
Parquet file read took : 534(ms) for 30 columns, iteration 1
Parquet file read took : 538(ms) for 40 columns, iteration 1
Parquet file read took : 1966(ms) for 50 columns, iteration 1
Parquet file read took : 523(ms) for 1 columns, iteration 2
Parquet file read took : 525(ms) for 2 columns, iteration 2
Parquet file read took : 691(ms) for 3 columns, iteration 2
Parquet file read took : 529(ms) for 4 columns, iteration 2
Parquet file read took : 530(ms) for 5 columns, iteration 2
Parquet file read took : 531(ms) for 10 columns, iteration 2
Parquet file read took : 532(ms) for 20 columns, iteration 2
Parquet file read took : 532(ms) for 30 columns, iteration 2
Parquet file read took : 1032(ms) for 40 columns, iteration 2
Parquet file read took : 1044(ms) for 50 columns, iteration 2
However its interesting to note there isn’t a huge difference in terms of read time for 1 column vs more. This can be attributed to the fact that this test ran on a dataset with only 100k rows (a few MBs). It will be explained later, advantages of Parquet format are more apparent when used with large files.
I ran another test – reading a regular text file vs parquet file. The file contained 20M rows and 6 columns. The results didn’t seem right, both storage and processing-wise. Was I missing something?
Text file size : 513M
Parquet file size : 895M

Time taken to read, Text : 13784ms
Time taken to read, Parquet: 19551ms
By default compression is not enabled, which is why the Parquet file is larger (footers, headers, summary files take up additional space. Note Parquet stores information regarding each page, column chunk, file to be able to determine the exact pages that need to be loaded by a query. You can find additional info here).
Also if you are reading all the columns, it is expected that the columnar format will be slower as row storage is more efficient when you read all the columns. Project fewer columns and you should find a difference. You should see a projection pushdown message in the logs.
Yes you need bigger files to get benefits from the columnar storage.
At this point, I wanted to try out encoding and see how that plays out on the overall storage. My next question – Are different encoding (RLE, dictionary) to be provided by the client, or does Parquet figure out the right one to use based on the data? Turns out Parquet will use the dictionary encoding if it can but right now you need to turn that on.http://parquet.io/parquet-mr/site/1.0.0-SNAPSHOT/apidocs/index.html
parquet.enable.dictionary=true
Finally did get some nice results after enabling dictionary encoding and filtering on a single column. It was a lot better storage wise too once Dictionary Encoding was enabled. (The following was run on a larger dataset)
1st iteration:

Text file read took : 42231ms
Parquet file read took : 27853ms

Here are the numbers from 2nd iteration - just to negate the effects of JVM warmup, and to be fair to text-row format :)

2nd iteration:

Text file read took : 36555ms
Parquet file read took : 27669ms
Schema Management
Parquet can handle multiple schemas. This is important for our use-case at SFDC for log processing. We have several different types of logs, each with its own schema, and we have a few hundred of them. Most pig queries run against a few log types. Parquet merges schema and provides the ability to parse out columns from different files.
LogType A : organizationId, userId, timestamp, recordId, cpuTime
LogType V : userId, organizationId, timestamp, foo, bar
A query that tries to parse the organizationId and userId from the 2 logTypes should be able to do so correctly, though they are positioned differently in the schema. With Parquet, it’s not a problem. It will merge ‘A’ and ‘V’ schemas and project columns accordingly. It does so by maintaining a file schema in addition to merged schema and parsing the columns by referencing the 2.
Projection Pushdown
One of the advantages of a columnar format is the fact that it can read only those parts from a file that are necessary. The columns not required are never read, avoiding unnecessary and expensive I/O.
For doing this in Pig, just pass in the required schema in to the constructor of ParquetLoader.
A = LOAD ‘/parquet/file’ USING parquet.pig.ParquetLoader.('a:intint');
The above query loads columns ‘a’ and ‘b’ only. When you do so, you should find a message similar to the following in logs
Assembled and processed 4194181 records from 2 columns in 2124 ms: 1974.6615 rec/ms, 1974.6615 cell/ms
If you hadn’t done that, a file containing 16 columns would all be loaded
Assembled and processed 4194181 records from 16 columns in 6313 ms: 664.3721 rec/ms, 10629.953 cell/ms
Summary Files
Parquet generates a summary file for all part files generated under a directory (job output). The summary file reduces the number of calls to the namenode and individual slaves while producing the splits which reduces the latency to start a job significantly. Otherwise it will have to open the footer of every part file which occasionally is slowed down by the namenode or a bad slave that we happen to hit. Reading one summary file reduces the risks to hit a slow slave and the load on the namenode.
For example, if the output directory to which Parquet files are written by a Pig script is ‘/user/username/foo’.
STORE someAlias INTO ‘/user/username/foo’ using parquet.pig.ParquetStorer();
This will create part files under ‘foo’, the number of these part files depends on the number of reducers.
/user/username/foo/part-r-00000.parquet

/user/username/foo/part-r-00001.parquet

/user/username/foo/part-r-00002.parquet

/user/username/foo/part-r-00003.parquet
The summary file is generated when the hadoop job writing the files is finished as it is in the outputCommitter of the output format (ParquetOutputCommitter.commitJob). It reads all footers in parallel and creates the summary file so all subsequent “LOAD” or reads on the directory ‘foo’ could be more efficient.
There is one summary file for all the part files output by the same job. That is, one per directory containing multiple part files.
Hadoop Compatibility
Anyone who has been a part of a major hadoop upgrade should be familiar with how painful the process can be. At SFDC, we moved from a really old version 0.20.2 to 2.x (recently declared GA). This involved upgrading a ton of dependencies, making client side changes to use the newer APIs, bunch of new configurations, and eliminating a whole lot of deprecated stuff. Though this was a major upgrade and most upgrades here on should be smooth(er), it always helps if dependent and 3rd party libraries don’t need to be recompiled.
With Parquet, you should not need to re-compile for hadoop 2. It hides all the hadoop 2 incompatibilities behind reflective calls so the same jars will work.
And finally ..
We at Salesforce.com have been early adopters of several big data open source technologies. Hadoop, Pig, HBase, Kafka, Zookeeper, Oozie to name a few either have been or are in the process of making it to production. Phoenix, a SQL layer on top of HBase is a project that was homegrown and is now open-sourced. Parquet is the latest addition, and we are looking forward to using it for more datasets (Oracle exports for example) in the near future and not just application logs. The Parquet community is helpful, open to new ideas and contributions, which is great for any open source project.

Monday 20 October 2014

HAWQ with Parquet Files

HAWQ with Parquet Files

Here is a quick example showing how this work.
First, create a table with some data. You can either use the CTAS method or the more traditional CREATE TABLE and then INSERT. Either method works and it is up to your preference on which you do. This example generates only 100 records of some fake customer data.
CTAS
CREATE TABLE CUSTOMER 
WITH (appendonly=true, orientation=parquet)
AS
SELECT i AS id, 'jon' || i AS fname, 'roberts' || i AS lname, i::text || ' main street'::text AS address, 'new york'::text AS city, 'ny'::text AS state, lpad(i, 5, '0') AS zip
FROM (SELECT generate_series(1, 100) AS i) AS sub
DISTRIBUTED BY (id);
CREATE and then INSERT
CREATE TABLE customer
(
  id integer,
  fname text,
  lname text,
  address text,
  city text,
  state text,
  zip text
)
WITH (APPENDONLY=true, ORIENTATION=parquet, 
  OIDS=FALSE
)
DISTRIBUTED BY (id);

INSERT INTO customer
SELECT i AS id, 'jon' || i AS fname, 'roberts' || i AS lname, i::text || ' main street'::text AS address, 'new york'::text AS city, 'ny'::text AS state, lpad(i, 5, '0') AS zip
FROM (SELECT generate_series(1, 100) AS i) AS sub;
Now you have data in the Parquet format in HAWQ. Pretty easy, huh?
Next, I’ll use a nifty tool that queries the HAWQ catalog which tells me where the Parquet files are.
gpextract -o customer.yaml -W customer -dgpadmin
And here is the customer.yaml file it created.
DBVersion: PostgreSQL 8.2.15 (Greenplum Database 4.2.0 build 1) (HAWQ 1.2.0.1 build
  8119) on x86_64-unknown-linux-gnu, compiled by GCC gcc (GCC) 4.4.2 compiled on Apr
  23 2014 16:12:32
DFS_URL: hdfs://phd1.pivotalguru.com:8020
Encoding: UTF8
FileFormat: Parquet
Parquet_FileLocations:
  Checksum: false
  CompressionLevel: 0
  CompressionType: null
  EnableDictionary: false
  Files:
  - path: /hawq_data/gpseg0/16385/16554/16622.0
    size: 4493
  - path: /hawq_data/gpseg1/16385/16554/16622.0
    size: 4499
  PageSize: 1048576
  RowGroupSize: 8388608
TableName: public.customer
Version: 1.0.0
Notice the path to the files which are in Hadoop and are in the Parquet format.
Now you can use a tool like Pig to look at the data.
grunt> A = load '/hawq_data/gpseg{0,1}/16385/16554/16622' USING parquet.pig.ParquetLoader();
grunt> describe A;                                                                          
A: {id: int,fname: bytearray,lname: bytearray,address: bytearray,city: bytearray,state: bytearray,zip: bytearray}
grunt> B = foreach A generate id, fname, lname, address, city, state, zip;
grunt> dump B;
(2,jon2,roberts2,2 main street,new york,ny,00002)
(4,jon4,roberts4,4 main street,new york,ny,00004)
(6,jon6,roberts6,6 main street,new york,ny,00006)
(8,jon8,roberts8,8 main street,new york,ny,00008)
.....
Parquet is easy to use in HAWQ and doesn’t lock you into a Pivotal HD and HAWQ only solution. It is easy to use the other tools like Pig or MapReduce to read the Parquet files in your Hadoop cluster. No vendor lock-in.
Related Posts Plugin for WordPress, Blogger...