Saturday 20 August 2016

Free Big Data Project with End to End Flow @ Kalyan







Pre-Requisites of Big Data Project:
hadoop-2.6.0
hbase-1.1.2
phoenix-4.7.0
flume-1.6.0
tomcat-7
java-1.7

NOTE: Make sure that install all the above components

You can follow this link to install above components

---------------------------------------------------------------------------------
   Follow the below instructions to work with Big Data Project
---------------------------------------------------------------------------------

Project Download Links:
`hadoop-2.6.0.tar.gz ` ==> link
`hbase-1.1.2-bin.tar.gz ` ==> link
`phoenix-4.7.0-HBase-1.1-bin.tar.gz ` ==> link
`apache-flume-1.6.0-bin.tar.gz ` ==> link
`apache-tomcat-7.0.70.tar.gz ` ==> link

`kalyan.war` ==> link
`flume-phoenix.conf` ==> link
`phoenix-flume-4.7.0-HBase-1.1.jar` ==> link
`json-path-2.2.0.jar` ==> link


---------------------------------------------------------------------------------


Start the hadoop

Start the hbase


Start the phoenix


---------------------------------------------------------------------------------

Create `users` and `productlog` tables in `phoenix` with below queries

CREATE TABLE users(userid bigint PRIMARY KEY, username varchar, password varchar, email varchar, country varchar, state varchar, city varchar, date varchar);

CREATE TABLE IF NOT EXISTS productlog(userid bigint not null, username varchar, email varchar, date varchar  not null, product varchar  not null, transaction varchar, country varchar, state varchar, city varchar CONSTRAINT pk PRIMARY KEY (userid, date, product));



---------------------------------------------------------------------------------

update `~/.bashrc` with below changes

export TOMCAT_HOME=/home/hadoop/work/apache-tomcat-7.0.70
export PATH=$TOMCAT_HOME/bin:$PATH
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

---------------------------------------------------------------------------------

Copy the `kalyan.war` file to `$TOMCAT_HOME/webapps`


---------------------------------------------------------------------------------

Start the `TOMCAT` server with command is
$TOMCAT_HOME/bin/startup.sh


---------------------------------------------------------------------------------

Generate sample users to work with `Big Data Project` using below commands

java -cp $TOMCAT_HOME/webapps/kalyan/kalyan.jar:$TOMCAT_HOME/webapps/kalyan/WEB-INF/lib/*  com.orienit.kalyan.utils.GenerateUsers <no.of.users>

java -cp $TOMCAT_HOME/webapps/kalyan/kalyan.jar:$TOMCAT_HOME/webapps/kalyan/WEB-INF/lib/*  com.orienit.kalyan.utils.GenerateUsers 100


---------------------------------------------------------------------------------

Generate sample product logs to work with `Big Data Project` using below commands

java -cp $TOMCAT_HOME/webapps/kalyan/kalyan.jar:$TOMCAT_HOME/webapps/kalyan/WEB-INF/lib/*  com.orienit.kalyan.utils.GenerateProductLog <path of the log file> <no.of.logs>

java -cp $TOMCAT_HOME/webapps/kalyan/kalyan.jar:$TOMCAT_HOME/webapps/kalyan/WEB-INF/lib/*  com.orienit.kalyan.utils.GenerateProductLog /tmp/product.log 10000



---------------------------------------------------------------------------------


Send the log changes to Phoenix using `Flume-Phoenix Integration`
(This is one of my contribution to Apache)


---------------------------------------------------------------------------------


Create `flume-phoenix.conf` file with below content

flume-phoenix.sources = execsource
flume-phoenix.sinks = phoenixsink
flume-phoenix.channels = memorychannel

flume-phoenix.sources.execsource.type = exec

flume-phoenix.sources.execsource.command = tail -F /tmp/product.log
flume-phoenix.sources.execsource.channels = memorychannel

flume-phoenix.sinks.phoenixsink.type = org.apache.phoenix.flume.sink.PhoenixSink

flume-phoenix.sinks.phoenixsink.channel = memorychannel
flume-phoenix.sinks.phoenixsink.batchSize = 10
flume-phoenix.sinks.phoenixsink.zookeeperQuorum = localhost
flume-phoenix.sinks.phoenixsink.table = productlog
flume-phoenix.sinks.phoenixsink.ddl = CREATE TABLE IF NOT EXISTS productlog(userid bigint not null, username varchar, email varchar, date varchar not null, product varchar not null, transaction varchar, country varchar, state varchar, city varchar CONSTRAINT pk PRIMARY KEY (userid, date, product))
flume-phoenix.sinks.phoenixsink.serializer = json
flume-phoenix.sinks.phoenixsink.serializer.columnsMapping = {"userid":"userid", "username":"username", "email":"email", "date":"date", "product":"product", "transaction":"transaction", "country":"country", "state":"state", "city":"city"}
flume-phoenix.sinks.phoenixsink.serializer.partialSchema = true
flume-phoenix.sinks.phoenixsink.serializer.columns=userid,username,email,date,product,transaction,country,state,city

flume-phoenix.channels.memorychannel.type = memory

flume-phoenix.channels.memorychannel.capacity = 1000
flume-phoenix.channels.memorychannel.transactionCapacity = 100



---------------------------------------------------------------------------------

Copy the `flume-phoenix.conf` file to `$FLUME_HOME/conf` folder

---------------------------------------------------------------------------------


Copy the `$FLUME_HOME/conf/flume-env.sh.template` file to `$FLUME_HOME/conf` folder and rename to `flume-env.sh`

---------------------------------------------------------------------------------

Update `flume-env.sh` file with below change

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

---------------------------------------------------------------------------------

Copy the `phoenix-flume-4.7.0-HBase-1.1.jar` file to `$FLUME_HOME/lib` folder

---------------------------------------------------------------------------------

Copy the `json-path-2.2.0.jar` file to `$FLUME_HOME/lib` folder

---------------------------------------------------------------------------------

Execute the below command `Send the log changes to Phoenix `

$FLUME_HOME/bin/flume-ng agent -n flume-phoenix --conf $FLUME_HOME/conf -f $FLUME_HOME/conf/flume-phoenix.conf -Dflume.root.logger=DEBUG,console

---------------------------------------------------------------------------------

Verify the project output through web ui 
http://localhost:8080/kalyan/home

---------------------------------------------------------------------------------

Stop the `TOMCAT` server with command is 
$TOMCAT_HOME/bin/shutdown.sh


---------------------------------------------------------------------------------





Tuesday 12 July 2016

Free Workshop on Big Data E-commerce Project End-to-End Explanation

------------------------------------------------------------------------------------------------------------
  Free Workshop on Big Data E-commerce Project End-to-End Explanation
------------------------------------------------------------------------------------------------------------

Details about the Workshop:

1. Understanding the software requirements specification (SRS).

2. Understanding the Design of Product

3. Migrating existing project from RDMBS => HBASE

4. Migrating existing project from RDMBS => PHOENIX

5. Migrating existing project from RDMBS => CASSANDRA

6. Migrating existing project from RDMBS => MONGODB

7. Understanding the challeges with RDBMS, why to go to HADOOP

8. Understanding the challeges with HADOOP, why to go to SPARK

9. Visualize the REPORTS generated using RDBMS 

10. Visualize the REPORTS generated using HADOOP COMPONENTS

11. Visualize the REPORTS generated using SPARK

12. All these above functionalities verify through LIVE PROJECT




Hadoop and Spark Real Time Project Workshop By KALYAN @ ORIENIT


Thursday 2 June 2016

How to resolve Flume Twitter Streaming issue

Problem with Flume Twitter Issue:

2016-06-02 12:36:21,765 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] 404:The URI requested is invalid or the resource requested, such as a user, does not exist.
Unknown URL. See Twitter Streaming API documentation at http://dev.twitter.com/pages/streaming_api

2016-06-02 12:36:21,766 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Waiting for 20000 milliseconds
2016-06-02 12:36:21,766 (Twitter Stream consumer-1[Waiting for 20000 milliseconds]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Twitter Stream consumer-1[Waiting for 20000 milliseconds]
2016-06-02 12:36:38,685 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume-twitter.conf for changes
2016-06-02 12:36:41,766 (Twitter Stream consumer-1[Waiting for 20000 milliseconds]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] 404:The URI requested is invalid or the resource requested, such as a user, does not exist.
Unknown URL. See Twitter Streaming API documentation at http://dev.twitter.com/pages/streaming_api

2016-06-02 12:36:41,766 (Twitter Stream consumer-1[Waiting for 20000 milliseconds]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Establishing connection.
2016-06-02 12:36:41,766 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Twitter Stream consumer-1[Establishing connection]





Solution for Flume Twitter issue:

1. create "kalyan-twitter-agent.conf" file with below content

kalyan-twitter-agent.sources = Twitter
kalyan-twitter-agent.channels = MemChannel
kalyan-twitter-agent.sinks = HDFS
 
kalyan-twitter-agent.sources.Twitter.type = com.orienit.kalyan.hadoop.training.flume.KalyanTwitterSource
kalyan-twitter-agent.sources.Twitter.channels = MemChannel
kalyan-twitter-agent.sources.Twitter.consumerKey = **********
kalyan-twitter-agent.sources.Twitter.consumerSecret = **********
kalyan-twitter-agent.sources.Twitter.accessToken = **********
kalyan-twitter-agent.sources.Twitter.accessTokenSecret = **********
 
kalyan-twitter-agent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing
 
kalyan-twitter-agent.sinks.HDFS.channel = MemChannel
kalyan-twitter-agent.sinks.HDFS.type = hdfs
kalyan-twitter-agent.sinks.HDFS.hdfs.path = hdfs://localhost:8020/user/flume/tweets/%Y-%m-%d-%H-%M
kalyan-twitter-agent.sinks.HDFS.hdfs.fileType = DataStream
kalyan-twitter-agent.sinks.HDFS.hdfs.writeFormat = Text
kalyan-twitter-agent.sinks.HDFS.hdfs.batchSize = 1000
kalyan-twitter-agent.sinks.HDFS.hdfs.rollSize = 0
kalyan-twitter-agent.sinks.HDFS.hdfs.rollCount = 10000
kalyan-twitter-agent.sinks.HDFS.hdfs.useLocalTimeStamp = true

kalyan-twitter-agent.channels.MemChannel.type = memory
kalyan-twitter-agent.channels.MemChannel.capacity = 10000
kalyan-twitter-agent.channels.MemChannel.transactionCapacity = 100


2. Copy "kalyan-twitter-agent.conf" file in "$FUME_HOME/conf" folder

3. Copy "kalyan-twitter.jar", "twitter4j-core-3.0.3.jar", "twitter4j-media-support-3.0.3.jar", "twitter4j-stream-3.0.3.jar" files into "$FLUME_HOME/lib" folder

Download the above jar files from below links:







4. Execute below command to extract data from twitter using flume

$FLUME_HOME/bin/flume-ng agent -n kalyan-twitter-agent --conf $FLUME_HOME/conf -f $FLUME_HOME/conf/kalyan-twitter-agent.conf -Dflume.root.logger=DEBUG,console

5. Verify the data in console

 

6. Verify the data in hdfs location is "hdfs://localhost:8020/user/flume/tweets"
























Saturday 30 April 2016

If mapreduce jobs are not possible to run, issue with yarn because of less ram

If mapreduce jobs are not possible to run, issue with yarn because of less ram . 

update "yarn-site.xml" with below properties and restart "yarn"

<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>20480</value>
</property>

How to change the hostname in ubuntu

  1. open "/etc/hostname" file using text editor
  2. command: sudo gedit /etc/hostname
  3. modify the existing hostname value with new
  4. open "/etc/hosts" file using text editor
  5. command:  sudo gedit /etc/hosts
  6. modify the existing hostname value with new
  7. execute below command to refresh the hostname
  8. command: sudo service hostname restart

Thursday 24 March 2016

12 KEY STEPS TO KEEP YOUR HADOOP CLUSTER RUNNING STRONG AND PERFORMING OPTIMUM

1. Hadoop deployment on 64bit OS:

  • 32bit OS have 3GB limit on Java Heap size so make sure that Hadoop Namenode/Datanode is running on 64bit OS.

 2. Mapper and Reducer count setup:

This is cluster specific value and reflects total number of mapper and reducers per tasktracker.
conf/mapred-site.xmlmapred.tasktracker.map.tasks.maximumNThe maximum number of map task slots to run simultaneously
conf/mapred-site.xmlmapred.tasktracker.reduce.tasks.maximumNThe maximum number of reduce task slots to run simultaneously







If no value is set the default is 2 and -1 specifies that the number of map/reduce task slots is based on the total amount of memory reserved for MapReduce by the sysadmin.
To set this value you would need to consider tasktracker CPU (+/- HT), DISK and Memory in account along with if your job is CPU intensive or not from a degree 1-10. For example if tasktracker is a quad core CPU with hyper-threading box, then there will be 4 physical and 4 virtual, total 8 CPU. For a high CPU intensive job we sure can assign 4 mappers and 4 reducer tasks however for a far less CPU intensive job, we can have up to 40 mappers & 40 reducers. 
You don’t need to have mapper or reducers count same as it is all depend on how the job are created. We can also have 6 Mappers and 2 Reducer also depend on  how much work is done by each mapper and reduce and to get this info, we can look at job specific counters. The number of mappers and reducer per tasktracker is depend of CPU utilization per task. You can also look at each reduce task counter to see how long CPU was utilized for the total map/reduce task time. If there is long wait then you may need to reduce the count however if everything is done very fast, it gives you some idea on adding either mapper or reducer count per tasktracker.
Users must understand that having larger mapper count compare to physical CPU cores, will result in CPU context switching, which may result as an overall slow job completion. However a balanced per CPU job configuration may results faster job completion results.

3. Per Task JVM Memory Configuration:

This particular memory configuration is important to setup based on total RAM in each tasktracker.
conf/mapred-site.xmlmapred.child.java.opts-Xmx{YOUR_Value}MLarger heap-size for child jvms of maps/reduces.




The value for above parameter is depend on total mapper and reducer task per tasktracker so you must know these two parameters before setting. Here are few ways to calculate proper values for these parameters:
  • Lets consider there are 4 mappers and 4 reducer per tasktracker with 32GB total RAM in each machine
    • In this scenario there will be total 8 tasks running in any tasktracker
    • Lets consider about 2-4 GB RAM is required for Tasktracker to perform other jobs so there is about ~28GB RAM available for Hadoop Tasks
    • Now we can divide 28/8 and get 3.5GB per task RAM
    • The value in this case will be -Xmx3500M
  • Lets consider there are 8 mappers and 4 reducer per tasktracker with 32GB total RAM
    • In this scenario there will be total 12 tasks running in any tasktracker
    • Lets consider about 2-4 GB RAM is required for Tasktracker to perform other jobs so there is about ~28GB RAM available for Hadoop Tasks
    • Now we can divide 28/12 and get 2.35GB per task RAM
    • The value in this case will be -Xmx2300M
  • Lets consider there are 12 mappers and 8 reducer per tasktracker with 128GB total RAM, also one specific node is working as secondary namenode
    • It is not suggested to keep Secondary Namenode with Datanode/TaskTracker however in this example we will keep it here for the sake of calculation.
    • In this scenario there will be total 20 tasks running in any tasktracker
    • Lets consider about  8 GB RAM is required for Secondary namenode to perform its jobs and  4GB  for other jobs so there is about ~100GB RAM available for Hadoop Tasks
    • Now we can divide 100/20 and get 5GB per task RAM
    • The value in this case will be around -Xmx5000M
  • Note:
    • HDP 1.2 have some new JVM specific configuration which can be used for much more granular memory setting.
    • If Hadoop cluster does not have identical machines in memory (i.e. a collection of machines with 32GB & 64GB RAM) then user should use lower memory configuration as the base line.
    • It is always best to have ~20% memory left for other processes.
    • Do not overcommit the memory for total tasks, it sure will cause JVM OOM errors.

4. Setting mapper or reducer memory limit to unlimited:

Setting both mapred.job.{map|reduce}.memory.mb value to -1 or maximum helps mapreduce  jobs use maximum amount memory available.
MAP.JOB.MAP.MEMORY.MB
-1
THIS PROPERTY’S VALUE SETS THE VIRTUAL MEMORY SIZE OF A SINGLE MAP TASK FOR THE JOB.
mapred.job.reduce.memory.mb-1This property’s value sets the virtual memory size of a single reduce task for the job
5. Setting No limit (or Maximum) for total number of tasks per job:
Setting this value to a certain limit put constraints on mapreduce job completion & performance. It is best to set it as -1 so it can use the maximum available.
mapred.jobtracker.maxtasks.per.job-1Set this property’s value to any positive integer to set the maximum number of tasks for a single job. The default value of -1 indicates that there is no maximum.

 6. Memory configuration for sorting data within processes:

There are two values io.sort.factor and io.sort.mb in this segment.  Based on experience this value io.sort.mb should be 25-30% of mapred.child.java.opts value.
conf/core-site.xmlio.sort.factor100More streams merged at once while sorting files.
conf/core-site.xmlio.sort.mbNNNHigher memory-limit while sorting data.
So for example if mapred.child.java.opts is 2 GB, io.sort.mb can be 500MB or if mapred.child.java.opts is 3.5 GB then io.sort.mb can be 768MB.
Also after running a few mapreduce jobs, analyzing log messages will help you to determine a better settings for io.sort.mb memory size. User must know that having a low io.sort.mb will cause lot more time in sort procedure, however a higher value may result job failure.

7. Reducer Parallel copies configuration:

A large number of parallel copies would cause high memory utilization and cause java heap error. However a small number would cause slow job completion. Keeping this valve to optimum helps mapreduce jobs complete faster.
conf/mapred-site.xmlmapred.reduce.parallel.copies20
The default number of parallel transfers run by reduce during the copy(shuffle) phase.
Higher number of parallel copies run by reduces to fetch outputs from very large number of maps.
This value is very much network specific. Having a larger value means higher network activity between tasktrackers. With higher parallel reduce copies, reducers will create many network connections which congest the network in a Hadoop cluster. A lower number helps stable network connectivity in a Hadoop cluster. Users should choose this number depending on their network strength.  I think the recommended value can be between 12-18 in a gigabit network.

8. Setting Reducer Input limit to maximum:

Sometimes setting a lower limit to reducer input size may cause job failures. It is best to set the reducer input limit to maximum.
conf/mapred-site.xmlmapreduce.reduce.input.limit-1The limit on the input size of the reduce. If the estimated input size of the reduce is greater than this value, job is failed. A value of -1 means that there is no limit set.

This value is based on disk size and available space in the tasktracker. So if there is a cluster in which each datanode has variation in configured disk space, setting a specific value may cause job failures. Setting this value to -1 helps reducers to work based on available space.

9. Setting Map input split size:

During a mapreduce job execution,  map jobs are created per split. Having split size set to 0 helps jobtracker  to decide the split size based on data source.
mapred.min.split.size0The minimum size chunk that map input should be split into. File formats with minimum split sizes take priority over this setting.

 10. Setting HDFS block size:

  • Currently I have seen various Hadoop clusters running great with variety of HDFS block sizes.
  • A user can set dfs.block.size in hdfs-site.xml between 64MB and 1GB or more.

11. Setting  user priority, “High” in Hadoop Cluster:

  • In Hadoop clusters jobs, are submitted based on users priority if certain type of job scheduler are configured
  • If a hadoop user is lower in priority, the mappers and reducers task will have to wait longer to get task slots in tasktracker. This could ultimately cause longer mapreduce jobs.
    • In some cases a time out could occur and the mapreduce job may fail
  • If a job scheduler is configured, submitting job through high  job scheduling priority user, will result faster job completion in a Hadoop cluster.
 12. Secondary Namenode or Highly Available Namenode Configuration:
  • Having secondary namenode or Highly Available namenode helps Hadoop cluster to be always/highly available.
  • However I  have seen some cases where secondary namenode or HA namenode is running on a datanode which could impact the cluster performance.
  • Keeping Secondary Namenode or High Available Namenode separate from Datanode/JobTracker helps dedicated resources available for tasks assigned to the tasktracker.

Related Posts Plugin for WordPress, Blogger...