Tuesday, 21 October 2014

Snappy compression with Pig and native MapReduce

Assuming you have installed Hadoop on your cluster, if not please followhttp://code.google.com/p/hadoop-snappy/
This is the machine config of my cluster nodes, though the steps that follow could be followed with your installation/machine configs
pkommireddi@pkommireddi-wsl:/tools/hadoop/pig-0.9.1/lib$ uname -a
Linux pkommireddi-wsl 2.6.32-37-generic #81-Ubuntu SMP Fri Dec 2 20:32:42 UTC 2011 x86_64 GNU/Linux
Pig requires that the snappy jar and native be available on its classpath when a script is run.
The pig client here is installed at /tools/hadoop and the jar needs to be placed within $PIG_HOME/lib.
/tools/hadoop/pig-0.9.1/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar
Also, you need to point PIG to the snappy native
export PIG_OPTS="$PIG_OPTS -Djava.library.path=$HADOOP_HOME/lib/native/Linux-amd64-64"
Now you have 2 ways to use map output compression in the Pig scripts:
  1. Follow instructions on http://code.google.com/p/hadoop-snappy/ to set map output compression at a cluster level
  2. Use Pig’s “set” keyword for per job level configuration
    set mapred.compress.map.output true;
    set mapred.map.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec;
This should get you going with using Snappy for Map output compression with Pig. You can read and write Snappy compressed files as well, though I would not recommend doing that as its not very efficient space-wise compared to other compression algorithms. There is work being done to be able to use Snappy for creating intermediate/temporary files between multiple MR jobs. You can watch the work item herehttps://issues.apache.org/jira/browse/PIG-2319
Using Snappy for Native Java MapReduce:
Set Configuration parameters for Map output compression
Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.set("mapred.map.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec
Set Configuration parameters for Snappy compressed intermediate Sequence Files
conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); //Block level is better than Record level, in most cases
SequenceFileOutputFormat.setCompressOutput(conf, true);
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec
Benefits:
  1. Map tasks begin transferring data sooner compared to Gzip or Bzip (though more data needs to be transferred to Reduce tasks)
  2. Reduce tasks run faster with better decompression speeds
  3. Snappy is not CPU intensive – which means MR tasks have more CPU for user operations
What you SHOULD use Snappy for
Map output: Snappy works great if you have large amounts of data flowing from Mappers to the Reducers (you might not see a significant difference if data volume between Map and Reduce is low)
Temporary Intermediate files (not available currently as of Pig 0.9.2, applicable only to native Map Reduce) : If you have a series of MR jobs chained together, Snappy compression is a good way to store the intermediate files. Please do make sure these intermediate files are cleaned up soon enough so we don’t have disk space issues on the cluster.
What you should NOT use Snappy for
Permanent Storage: Snappy compression is not efficient space-wise and it is expensive to store data on HDFS (3-way replication)
Plain text files: Like Gzip, Snappy is not splittable. Do not store plain text files in Snappy compressed form, instead use a container like SequenceFile.

Hadoop java.lang.OutOfMemoryError: Java heap space

Ever run into Out of memory error with your MR tasks? This could happen sometimes if you are using large lookups (DistributedCache) or data structures holding huge amount of Entries, and the heap allocated to MapReduce tasks is not sufficient to handle this.
Stepping back, each map and reduce task is launched in separate JVM’. The heap allocated here is based on the configuration parameter “mapred.child.java.opts“. If this is too small to handle your data, the hadoop job will complain it does not have sufficient heap to continue. Try increasing the heap size through Configuration, but make sure you keep in mind the amount of RAM/number of cores/number of MR tasks configured on the nodes while adjusting the parameter.
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx2048m</value>
</property>
Here I have increased the heap size to 2 GB. Of course I had huge RAM on the nodes in the cluster to be able to do so. Adding this as a resource to Configuration is one way, another is to simply inject it into Configuration invoking the set method.

set

public void set(String name, String value)
Set the value of the name property.
Parameters:
name – property name.
value – property value.
Related Posts Plugin for WordPress, Blogger...