Monday 12 October 2015

What is Hadoop Rack Awareness and How to configure it in a cluster

Details about Hadoop Rack Awareness

The Hadoop HDFS and the Map/Reduce components are rack-aware.
The NameNode and the JobTracker obtains the rack id of the slaves in the cluster by invoking an API resolve in an administrator configured module. The API resolves the slave’s DNS name (also IP address) to a rack id. What module to use can be configured using the configuration item topology.node.switch.mapping.impl. The default implementation of the same runs a script/command configured using topology.script.file.name
If topology.script.file.name is not set, the rack id /default-rack is returned for any passed IP address. The additional configuration in the Map/Reduce part is mapred.cache.task.levels which determines the number of levels (in the network topology) of caches.
So, for example, if it is the default value of 2, two levels of caches will be constructed – one for hosts (host -> task mapping) and another for racks (rack -> task mapping).

What is Rack Awareness in Hadoop

For small clusters in which all servers are connected by a single switch, there are only two levels of locality: “on-machine” and “off-machine.” When loading data from a DataNode’s local drive into HDFS, the NameNode will schedule one copy to go into the local DataNode, and will pick two other machines at random from the cluster.
For larger Hadoop installations which span multiple racks, it is important to ensure that replicas of data exist on multiple racks. This way, the loss of a switch does not render portions of the data unavailable due to all replicas being underneath it.
HDFS can be made rack-aware by the use of a script which allows the master node to map the network topology of the cluster. While alternate configuration strategies can be used, the default implementation allows you to provide an executable script which returns the “rack address” of each of a list of IP addresses.
The network topology script receives as arguments one or more IP addresses of nodes in the cluster. It returns on stdout a list of rack names, one for each input. The input and output order must be consistent.
To set the rack mapping script, specify the key topology.script.file.name in conf/hadoop-site.xml. This provides a command to run to return a rack id; it must be an executable script or program. By default, Hadoop will attempt to send a set of IP addresses to the file as several separate command line arguments. You can control the maximum acceptable number of arguments with the topology.script.number.args key.
Rack ids in Hadoop are hierarchical and look like path names. By default, every node has a rack id of /default-rack. You can set rack ids for nodes to any arbitrary path, e.g., /foo/bar-rack. Path elements further to the left are higher up the tree. Thus a reasonable structure for a large installation may be /top-switch-name/rack-name.
Hadoop rack ids are not currently expressive enough to handle an unusual routing topology such as a 3-d torus; they assume that each node is connected to a single switch which in turn has a single upstream switch. This is not usually a problem, however. Actual packet routing will be directed using the topology discovered by or set in switches and routers. The Hadoop rack ids will be used to find “near” and “far” nodes for replica placement (and in 0.17, MapReduce task placement).
The following example script performs rack identification based on IP addresses given a hierarchical IP addressing scheme enforced by the network administrator. This may work directly for simple installations; more complex network configurations may require a file- or table-based lookup process. Care should be taken in that case to keep the table up-to-date as nodes are physically relocated, etc. This script requires that the maximum number of arguments be set to 1.
#!/bin/bash # Set rack id based on IP address. # Assumes network administrator has complete control # over IP addresses assigned to nodes and they are # in the 10.x.y.z address space. Assumes that # IP addresses are distributed hierarchically. e.g., # 10.1.y.z is one data center segment and 10.2.y.z is another; # 10.1.1.z is one rack, 10.1.2.z is another rack in # the same segment, etc.) # # This is invoked with an IP address as its only argument
# get IP address from the input ipaddr=$0
# select “x.y” and convert it to “x/y” segments=`echo $ipaddr | cut –delimiter=. –fields=2-3 –output-delimiter=/` echo /${segments}



Configuring Rack Awareness in Hadoop

We are aware of the fact that hadoop divides the data into multiple file blocks and stores them on different machines. If Rack Awareness is not configured, there may be a possibility that hadoop will place all the copies of the block in same rack which results in loss of data when that rack fails.
Although rare, as rack failure is not as frequent as node failure, this can be avoided by explicitly configuring the Rack Awareness in conf-site.xml.
Rack awareness is configured using the property “topology.script.file.name” in the core-site.xml.
If topology.script.file.name is not configured, /default-rack is passed for any ip address i.e., all nodes are placed on same rack.
Configuring Rack awareness in hadoop involves two steps:
configure the “topology.script.file.name” in core-site.xml ,
<property> <name>topology.node.switch.mapping.impl</name> <value>org.apache.hadoop.net.ScriptBasedMapping</value> <description> The default implementation of the DNSToSwitchMapping. It invokes a script specified in topology.script.file.name to resolve node names. If the value for topology.script.file.name is not set, the default value of DEFAULT_RACK is returned for all node names. </description> </property>
<property> <name>topology.script.file.name</name> <value>core/rack-awareness.sh</value> </property>

Implement the rack-awareness.sh scripts as desired, Sample rack-awareness scripts can be found here,
1. Topology Script file named as : rack-awareness.sh

A sample Bash shell script:

HADOOP_CONF=$HADOOP_HOME/conf 

while [ $# -gt 0 ] ; do
  nodeArg=$1
  exec< ${HADOOP_CONF}/topology.data 
  result="" 
  while read line ; do
    ar=( $line ) 
    if [ "${ar[0]}" = "$nodeArg" ] ; then
      result="${ar[1]}"
    fi
  done 
  shift 
  if [ -z "$result" ] ; then
    echo -n "/default/rack "
  else
    echo -n "$result "
  fi
done 

2. Topology data file named as : topology.data

orienit.node1.com                     /dc1/rack1
orienit.node2.com                     /dc1/rack1
orienit.node3.com                     /dc1/rack1
orienit.node11                           /dc1/rack2
orienit.node12                           /dc1/rack2
orienit.node13                           /dc1/rack2
10.1.1.1                                     /dc1/rack3
10.1.1.2                                     /dc1/rack3
10.1.1.3                                     /dc1/rack3

Sunday 11 October 2015

Introduction to Machine Learning

This articles provides you with fundamentals of Machine Learning by explaining supervised and unsupervised learning along with the various tasks that are performed using machine learning algorithms.

What is Machine Learning?
Machine Learning is the study and design of algorithms that can learn from and make predictions on data. This is achieved by building a model from the sample data, which is then used to make data-driven predictions.
More formally, Machine learning is described by Tom Mitchell as: "A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P, if its performance at tasks in T, as measured by P, improves with experience E"
Example: playing checkers.
  • E = the experience of playing many games of checkers
  • T = the task of playing checkers.
  • P = the probability that the program will win the next game.
Machine learning can be classified as Supervised or Unsupervised. Supervised learning is the type of learning that takes place when the training instances are labelled with the correct result, which gives feedback about how learning is progressing. In unsupervised learning, the goal is harder because there are no pre-determined categorizations.

Supervised Learning
In supervised learning, we are given a data set and already know what our correct output should look like, having the idea that there is a relationship between the input and the output. It is a task of inferring a function from a labelled training data. Below diagram depicts supervised learning model -

Supervised learning problems are categorized into Regression and Classification problems as described in following sections.
  • Regression - In a regression problem, we are trying to predict results within a continuous output, meaning that we are trying to map input variables to some continuous function. It includes modelling and analysing several variables which consist of one dependent variable and one or more independent variables. It helps understand how does the dependent variable varies when one of the independent variable is varied, keeping all other variables fixed.Example: Given the data about the size of houses on the real estate market, try to predict their price. Price as a function of size is a continuous output, so this is a regression problem.
    Here are some of the popular Regression techniques:
    1. Linear Regression - Linear regression is used to try and fit the data into a straight line. It models the linear relationship between a dependent variable and one or more independent variables. Linear regression can be used to forecast/predict the dependent variable based on the observed data set if the relation between the variables is known to be almost linear.
    2. Locally Weighted Regression - Linear regression tries to fit a straight line to the data model, which is not a good fit in cases where the relationship is not linear or the data is too noisy. In such cases we use LWR. LWR removes the problem of linear regression by assigning weights to the training data. Weights are bigger for the data points closer to the data we are trying to predict. Since, LWR requires the entire data set every time (due to changes in weights), it is computationally expensive.
    3. Logistic Regression - Unlike linear regression where the output is a continuous function, in logistic regression the output can have only a limited number of discrete values. It is used when the dependent variable is of binary or discrete nature.
    4. Non-linear Regression - Nonlinear equations can take multiple forms. If the dependent variable cannot be modelled as a linear function of the independent variables, we use nonlinear regression to find a best fit model. The resulting model could be exponential, logarithmic, trigonometric etc.
  • Classification - In a classification problem, we are trying to predict results in a discrete output. In other words, we are trying to map input variables into discrete categories on the basis of training data set. An algorithm which implements classification is known as a classifier.Example: Predicting whether the house "sells for more or less than the asking price", we are classifying the houses based on price into two discrete categories.
    Here are some of the popular Classification techniques:
    1. Decision Tree Classifier - This methodology uses a decision tree as the predictive model. It is used in cases where all the features have a finite discrete domain and there is a single target feature. The tree is created using the sample data where each internal node splits into 2 or more sub trees according to the discrete function of the input attribute value.
    2. Naive Bayes Classifier - Naive Bayes classifier is a family of classifiers that work on the assumption that the value of a particular feature is independent of the value of any other feature (hence naive). The model assigns class labels to the data, represented as vectors of feature values. It is based on the Bayes theorem and hence are probabilistic in nature. This classification technique is used mostly in text classification (spam/not spam or sports, politics or entertainment etc.).
    3. Random Forests Classifier - This model is an extension of decision tree classifier. Many classification trees are grown to classify a new object from an input vector. Each tree then gives a classification, and we say the tree votes for that class. The forest chooses the class which has the maximum number of votes.
    4. Hidden Markov Model Classifier - It is a statistical model of a process consisting of two random variables, say A and B, which change their state sequentially. One of the two variables, A is termed as hidden variable as its state cannot be observed directly. The state of "A" changes with Markov property, i.e. the state change probability only depends on its current state and does not change in time. The variable B is called as the observed variable since its state can be directly observed. B does not follow the Markov property, but its state probability statically depends on the current state of A.
    5. Multi-layer Perceptron - A multilayer perceptron is a biologically inspired feed-forward network that can be trained to represent a nonlinear mapping between input and output data. It consists of multiple layers, each containing multiple artificial neuron units and can be used for classification and regression tasks in a supervised learning approach.
    6. K-nearest Neighbours - In k-NN classification, the object is classified by a majority vote of its neighbours. The object is assigned to the class which is most common among its k nearest neighbours. Weights are generally assigned to the neighbours while using this algorithm.

Unsupervised Learning
Unsupervised learning, on the other hand, allows us to approach problems with little or no idea what our results should look like. We can derive structure from data where we do not necessarily know the effect of the variables. Following diagram depicts unsupervised learning model -


We can derive this structure by clustering the data based on relationships among the variables in the data. With unsupervised learning there is no feedback based on the prediction results, i.e., there is no teacher to correct you. It is not just about clustering. For example, associative memory is unsupervised learning.
Unsupervised learning problems are categorized into Clustering and Collaborative filtering problems as described in following sections.
  • Clustering - Clustering is division of observation into clusters or groups such that all observations within a cluster have some similarity between them. Unlike classification, we are not aware of the types of clusters that will be formed at the end of the clustering algorithm and hence it lies under unsupervised learning.Here are some of the popular Classification techniques:
    1. Canopy Clustering - It is a pre-clustering algorithm used as a pre-processing step for K-Means algorithm. It is used to speed up the clustering process on large data sets, where using another algorithm directly would be impractical.
    2. K-means Clustering - K-Means clustering is used to partition n observations into k sets, where each observation belongs to the cluster with the nearest mean. In other words, the model divides the observation into k sets such that the within-cluster sum of squares is minimized.
    3. Fuzzy K-means Clustering - Unlike K-Means clustering, where each observation belongs to exactly one cluster, in Fuzzy K-Means clustering each observation can belong to multiple clusters with varying probability. Fuzzy K-Means tries to deal with the problem where points are somewhat in between centers.
    4. Streaming K-means Clustering - Streaming K-Means Clustering is used in cases when data set is too large to fit into memory as a whole. It consists of two major steps, Streaming step and BallKMeans step. In streaming step, a single pass over the data produces as many centroids as it determines is optimal. This data is then passed through the BallKMeans step which further reduces the number of centroids to K.
    5. Spectral Clustering - The goal of spectral clustering is to cluster data that is connected but not necessarily compact or clustered within convex boundaries.
    6. Mean Shift Clustering - The mean shift algorithm is a nonparametric clustering technique which does not require prior knowledge of the number of clusters, and does not constrain the shape of the clusters. It works by treating the points as an empirical probability function where dense regions correspond to local maxima.
    7. Correlation Clustering - Correlation Clustering is applied in cases where the actual data sets is not known, but the relation between the points in the data set is known. This model does not require prior knowledge of k, i.e. number of clusters to be formed.
  • Collaborative Filtering - Collaborative Filtering (CF) is the process of making automatic predictions about the interests of a user based on his interest/disinterest similarity with other users. It is based on the assumption that if a user A has the same interest as a person B on some issue, then A is more likely to have the same interest as B on some other issue x as compared to any randomly chosen person.Example: CF can be used to predict which food item a user would like based on the partial list of his likes and dislikes.
    Collaborative Filtering can be classified as:
    1. User-based Collaborative Filtering - This CF technique has 2 major steps. Firstly, we look for subject/users who share the same preferences or interests as the active user. Then we use the ratings received from that set of like-minder users to predict the interest of the active user. To implement this model, neighborhood based algorithms are used generally. A subset of users are choses based on their similarity to the active users and their weighted combinations is used as the predicted rating for the active user.
    2. Item-based Collaborative Filtering - Item based CF calculates the similarity between items based on the people's rating of those items. This is achieved by firstly finding similarity between all pairs of items. Once this step is completed, the system uses the most similar items to a user's already-rated items to generate a list of recommended items.

Distributed Machine Learning Tools and Frameworks
Many tools and frameworks have come up to help perform ML Techniques on Big Data in a distributed environment. Some of the popular ones have been listed below.
  • Apache Mahout: Apache Mahout provides implementation for scalable and distribute machine learning algorithms. Most of these implementation run on Apache Hadoop platform.
  • R: R is a free software environment for statistical computing and graphics. It has been used extensively for implementing ML algorithms. Packages in R language are available which make it possible to run these ML algorithms in a distributed environment such as Hadoop or H20. For example, when using R with H20, R tells H2O to perform a task, and then H2O returns the result back to R, which is a tiny result, but you never actually transfer the data to R.
  • Petuum: Petuum provides both tools as well as pre implemented algorithms to perform ML at large scale. The library of distributed ML algorithms can be used at massive scale for Big Data analytics.
  • Jubatus: Jubatus is a distributed processing framework and streaming machine learning library. It provides pre implemented algorithms for Classification, Regression, Recommendation (Nearest Neighbour Search), Graph Mining, Anomaly Detection, Clustering among others.


Spouts, Bolts, Streams and Toplogies in Apache Storm

This tutorials provides the details of various Apache Storm primitives such as Spouts, Bolts, Stream groupings and topologies.

Abstract
Apache Storm is a free and open source distributed real-time computation system that is scalable, reliable and easy to setup/maintain. Apache Storm cluster is made up of two types of processes - Nimbus and Supervisor. Nimbus is a process running on master node that is responsible for tracking the progress of data processing while Supervisor process runs on worker nodes and is responsible for executing the data processing logic. 

For a better and detailed knowledge of Apache Storm cluster, you are recommended to go through the tutorial - Introduction to Apache Storm.

In this tutorial, we will be discussing the various Apache Storm primitives that are as follows:
  • Spouts
  • Bolts
  • Streams
  • Topologies

Spouts
Spouts represent the source of data in Storm. You can write spouts to read data from data sources such as database, distributed file systems, messaging frameworks etc. Spouts can broadly be classified into following -
  • Reliable - These spouts have the capability to replay the tuples (a unit of data in data stream). This helps applications achieve 'at least once message processing' semantic as in case of failures, tuples can be replayed and processed again. Spouts for fetching the data from messaging frameworks are generally reliable as these frameworks provide the mechanism to replay the messages.
  • Unreliable - These spouts don't have the capability to replay the tuples. Once a tuple is emitted, it can not be replayed irrespective of whether it was processed successfully or not. This type of spouts follow 'at most once message processing' semantic.

Below diagram shows Spout hierarchy along with some important spouts that come bundled with Apache Storm. BaseRichSpout is an important class and all your Java spouts should extend it.



As we can see that there are some useful spouts that are ready to be used. E.g. kafkaSpout could be used to read messages off kafka topics.


Here is the sample Spout emitting random words -

backtype.storm.testing.TestWordSpout
package backtype.storm.testing;

import backtype.storm.Config;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class TestWordSpout extends BaseRichSpout {
    public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public TestWordSpout() {
        this(true);
    }

    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
        
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
        
    }
        
    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan""mike""jackson""golda""bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }
    
    public void ack(Object msgId) {

    }

    public void fail(Object msgId) {
        
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        else {
            return null;
        }
    }    
}

Some of the important methods in above spout definition are open (used for doing initializing work), nextTuple (called by Storm for getting the tuple) and declareOutputFields (used for associating the field names for emitted tuples).

Bolts
Bolts represent the processing logic unit in Storm. One can utilize bolts to do any kind of processing such as filtering, aggregating, joining, interacting with data stores, talking to external systems etc.

Similar to Spouts, bolts can also emit tuples (data messages) for the subsequent bolts to process. Additionally, bolts are responsible to acknowledge the processing of tuples after they are done processing. Spouts consider a tuple fully processed when they have received acknowledgement for that tuple as well as for all the typles emitted by bolts as part of that original tuple processing.

E.g. a spout emits a tuple t1 that goes to bolt b1 for processing. Bolt b1 processes t1, emits another tuple t2 and acknowledges the processing of tuple t1. At this point, even though tuple t1 has been acknowledgement, spout will not consider this tuple fully processed as tuple 2 emitted as part of its processing is still not acknowledged. Tuple t2 goes to bolt2 for processing and gets acknowledged. As a result, tuple t1 is considered fully processed by spout.

Below diagram shows Bolt hierarchy of Apache Storm.


There are following two important classes that Java bolts can extend from:
  • BaseBasicBolt - BaseBasicBolt does the acknowledgement of tuples automatically for you and hence you just need to focus on processing logic.
  • BaseRichBolt - On the other hand, BaseRichBolt requires you to acknowledge tuples explicitly. This can he helpful in scenarios where acknowledgements are not needed for tuples (tuples emitted without a message id).

Here is the sample Bolt adding exclamation mark to words emitted by Spout and further emitting a tuple for downstream bolts -

storm.starter.ExclamationBolt
package storm.starter;

import java.util.Collections;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class ExclamationBolt extends BaseBasicBolt{

  private static final long serialVersionUID = 1L;

  @Override
  public void execute(Tuple input, BasicOutputCollector collector) {
    collector.emit(Collections.singletonList((Object)(input.getString(0"!!!")));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

}


Since this bolt is extending BaseBasicBolt, it is not explicitly acknowledging the tuples in its execute method.

Streams
Streams represent the unbounded sequences of tuples where tuple is a unit of data. Stream of tuples flows from spout to bolt(s) or from bolt(s) to other bolt(s). Apache Stomr provides various stream grouping techniques to let you define how the data should flow in topology. Here are some of the stream grouping techniques:
  1. Shuffle grouping - This type of grouping distributes tuples equally and randomly to all the available bolt tasks.
  2. Fields grouping - This type of grouping makes sure that tuples with same field will go to same bolt task. For example, if the stream is grouped by "word" field, tuples with same "word" value will always go to same bolt task.
  3. Partial Key grouping - The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed.
  4. All grouping - In this grouping techniques, keys are not load balanced and all the stream goes to all of the downstream bolt tasks.
  5. Global grouping - In this grouping techniques, all the stream goes to any one of the downstream bolt task. This needs to be used with caution as this will serialize the processing of tuples that can result into slow processing of tuples.
  6. None grouping - This grouping is just an indicator that you don't care about grouping and would like to go with default. Currently, default grouping is shuffle grouping and may change in future releases so this should be used carefully.
  7. Direct grouping - In this grouping, producer of a tuple decides which task of the consumer will receive the emitted tuple. This is only applicable for streams declared as Direct stream.
  8. Local or shuffle grouping - Since a worker process can have multiple tasks, this grouping will shuffle to the in-process tasks. In case, a worker process is not configured to run multiple tasks, this will act as normal shuffle grouping.

A new stream grouping technique can be developed by implementing the backtype.storm.grouping.CustomStreamGrouping interface.

Topologies
A topology in storm represents the graph of computation and is implemented as DAG (Directed Acyclic Graph) data structure. Each node of this graph contains the data processing logic (bolts) while connecting edges define the flow of data (streams). Storm keeps the topology running forever untill you kill it.

Here is a sample topology called ExclamationTopology that will add two exclamation mark to the words emitted by TestWordSpout using ExclamationBolt:

storm.starter.ExclamationTopology
package storm.starter;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;

public class ExclamationTopology {

  /**
   @param args
   */
  public static void main(String[] argsthrows Exception {
    TopologyBuilder builder = new TopologyBuilder();

      builder.setSpout("word"new TestWordSpout()10);
      builder.setBolt("exclaim1"new ExclamationBolt()3).shuffleGrouping("word");
      builder.setBolt("exclaim2"new ExclamationBolt()2).shuffleGrouping("exclaim1");

      Config conf = new Config();
      StormSubmitter.submitTopology("ExclamationTopology", conf, builder.createTopology());
  }

}



Once you are done writing spouts, bolts and topology, you need to build a jar file containing all of these. Here is the command that you can use to run the exclamation topology in Apache Storm:

storm.starter.ExclamationTopology
$storm jar exclamation-topology-jar-file.jar storm.starter.ExclamationTopology

Introduction to Apache Storm

This article introduces you to Apache Storm, a real-time distributed processing / computing framework for big data, by providing details of its technical architecture along with the use cases it could be utilized in.

Abstract
---------------------------
Apache Storm, in simple terms, is a distributed framework for real time processing of Big Data like Apache Hadoop is a distributed framework for batch processing. Apache Storm works on task parallelism principle where in the same code is executed on multiple nodes with different input data.
Apache Storm does not have any state managing capabilities. It instead utilizes Apache ZooKeeper to manage its cluster state such as message acknowledgements, processing status etc. This enables Storm to start right from where it left even after the restart.
Since Storm's master node (called Nimbus) is a Thrift service, one can create and submit processing logic graph (called topology) in any programming language. Moreover, It is scalable, fault-tolerant and guarantees that input data will be processed.

Technical Architecture
--------------------------------------------------------
Here is the architecture diagram depicting the technical architecture of Apache Storm -



There are following two types of nodes services shown in above diagram -
  1. Nimbus Service on Master Node - Nimbus is a daemon that runs on the master node of Storm cluster. It is responsible for distributing the code among the worker nodes, assigning input data sets to machines for processing and monitoring for failures.Nimbus service is an Apache Thrift service enabling you to submit the code in any programming language. This way, you can always utilize the language that you are proficient in, without the need of learning a new language to utilize Apache Storm.
    Nimbus service relies on Apache ZooKeeper service to monitor the message processing tasks as all the worker nodes update their tasks status in Apache ZooKeeper service.
  2. Supervisor Service on Worker Node - All the workers nodes in Storm cluster run a daemon called Supervisor. Supervisor service receives the work assigned to a machine by Nimbus service. Supervisor manages worker processes to complete the tasks assigned by Nimbus. Each of these worker processes executes a subset of topology that we will talk about next.
And here are the important high level components that we have in each Supervisor node.
  1. Topology - Topology, in simple terms, is a graph of computation. Each node in a topology contains processing logic, and links between nodes indicate how data should be passed around between nodes. A Topology typically runs distributively on multiple workers processes on multiple worker nodes.
  2. Spout - A Topology starts with a spout, source of streams. A stream is made of unbounded sequence of tuples. A spout may read tuples off a messaging framework and emit them as stream of messages or it may connect to twitter API and emit a stream of tweets.In the above technical architecture diagram, a topology is shown with two spouts (source of streams).
  3. Bolt - A Bolt represents a node in a topology. It defines smallest processing logic within a topology. Output of a bolt can be fed into another bolt as input in a topology.In the above technical architecture diagram, a topology is shown with five bolts to process the data coming from two spouts.

Benefits

--------------------------
Apache Storm comes with the following benefits to provide your application with a robust real time processing engine -
  1. ScalableStorm scales to massive numbers of messages per second. To scale a topology, all you have to do is add machines and increase the parallelism settings of the topology. As an example of Storm's scale, one of Storm's initial applications processed 1,000,000 messages per second on a 10 node cluster, including hundreds of database calls per second as part of the topology. Storm's usage of Zookeeper for cluster coordination makes it scale to much larger cluster sizes.
  2. Guarantees no data lossA realtime system must have strong guarantees about data being successfully processed. A system that drops data has a very limited set of use cases. Storm guarantees that every message will be processed, and this is in direct contrast with other systems like S4.
  3. Extremely robust Unlike systems like Hadoop, which are notorious for being difficult to manage, Storm clusters just work. It is an explicit goal of the Storm project to make the user experience of managing Storm clusters as painless as possible.
  4. Fault-tolerantIf there are faults during execution of your computation, Storm will reassign tasks as necessary. Storm makes sure that a computation can run forever (or until you kill the computation).
  5. Programming language agnostic Robust and scalable realtime processing shouldn't be limited to a single platform. Storm topologies and processing components can be defined in any language, making Storm accessible to nearly anyone.
Possible Use Cases
Apache Storm, being a real-time processing framework, is suitable for extremely broad set of use cases some of which are as follows -
  • Stream processing - Processing messages and updating databases
  • Continuous computation - Doing a continuous query on data streams and streaming the results into clients
  • Distributed RPC - Parallelizing an intense query like a search query on the fly
  • Real-time analytics
  • Online machine learning
You can find the information about which all companies are using Apache Storm here
References
Related Posts Plugin for WordPress, Blogger...