Showing posts with label Storm. Show all posts
Showing posts with label Storm. Show all posts

Sunday 11 October 2015

Spouts, Bolts, Streams and Toplogies in Apache Storm

This tutorials provides the details of various Apache Storm primitives such as Spouts, Bolts, Stream groupings and topologies.

Abstract
Apache Storm is a free and open source distributed real-time computation system that is scalable, reliable and easy to setup/maintain. Apache Storm cluster is made up of two types of processes - Nimbus and Supervisor. Nimbus is a process running on master node that is responsible for tracking the progress of data processing while Supervisor process runs on worker nodes and is responsible for executing the data processing logic. 

For a better and detailed knowledge of Apache Storm cluster, you are recommended to go through the tutorial - Introduction to Apache Storm.

In this tutorial, we will be discussing the various Apache Storm primitives that are as follows:
  • Spouts
  • Bolts
  • Streams
  • Topologies

Spouts
Spouts represent the source of data in Storm. You can write spouts to read data from data sources such as database, distributed file systems, messaging frameworks etc. Spouts can broadly be classified into following -
  • Reliable - These spouts have the capability to replay the tuples (a unit of data in data stream). This helps applications achieve 'at least once message processing' semantic as in case of failures, tuples can be replayed and processed again. Spouts for fetching the data from messaging frameworks are generally reliable as these frameworks provide the mechanism to replay the messages.
  • Unreliable - These spouts don't have the capability to replay the tuples. Once a tuple is emitted, it can not be replayed irrespective of whether it was processed successfully or not. This type of spouts follow 'at most once message processing' semantic.

Below diagram shows Spout hierarchy along with some important spouts that come bundled with Apache Storm. BaseRichSpout is an important class and all your Java spouts should extend it.



As we can see that there are some useful spouts that are ready to be used. E.g. kafkaSpout could be used to read messages off kafka topics.


Here is the sample Spout emitting random words -

backtype.storm.testing.TestWordSpout
package backtype.storm.testing;

import backtype.storm.Config;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class TestWordSpout extends BaseRichSpout {
    public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public TestWordSpout() {
        this(true);
    }

    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
        
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
        
    }
        
    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan""mike""jackson""golda""bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }
    
    public void ack(Object msgId) {

    }

    public void fail(Object msgId) {
        
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        else {
            return null;
        }
    }    
}

Some of the important methods in above spout definition are open (used for doing initializing work), nextTuple (called by Storm for getting the tuple) and declareOutputFields (used for associating the field names for emitted tuples).

Bolts
Bolts represent the processing logic unit in Storm. One can utilize bolts to do any kind of processing such as filtering, aggregating, joining, interacting with data stores, talking to external systems etc.

Similar to Spouts, bolts can also emit tuples (data messages) for the subsequent bolts to process. Additionally, bolts are responsible to acknowledge the processing of tuples after they are done processing. Spouts consider a tuple fully processed when they have received acknowledgement for that tuple as well as for all the typles emitted by bolts as part of that original tuple processing.

E.g. a spout emits a tuple t1 that goes to bolt b1 for processing. Bolt b1 processes t1, emits another tuple t2 and acknowledges the processing of tuple t1. At this point, even though tuple t1 has been acknowledgement, spout will not consider this tuple fully processed as tuple 2 emitted as part of its processing is still not acknowledged. Tuple t2 goes to bolt2 for processing and gets acknowledged. As a result, tuple t1 is considered fully processed by spout.

Below diagram shows Bolt hierarchy of Apache Storm.


There are following two important classes that Java bolts can extend from:
  • BaseBasicBolt - BaseBasicBolt does the acknowledgement of tuples automatically for you and hence you just need to focus on processing logic.
  • BaseRichBolt - On the other hand, BaseRichBolt requires you to acknowledge tuples explicitly. This can he helpful in scenarios where acknowledgements are not needed for tuples (tuples emitted without a message id).

Here is the sample Bolt adding exclamation mark to words emitted by Spout and further emitting a tuple for downstream bolts -

storm.starter.ExclamationBolt
package storm.starter;

import java.util.Collections;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class ExclamationBolt extends BaseBasicBolt{

  private static final long serialVersionUID = 1L;

  @Override
  public void execute(Tuple input, BasicOutputCollector collector) {
    collector.emit(Collections.singletonList((Object)(input.getString(0"!!!")));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

}


Since this bolt is extending BaseBasicBolt, it is not explicitly acknowledging the tuples in its execute method.

Streams
Streams represent the unbounded sequences of tuples where tuple is a unit of data. Stream of tuples flows from spout to bolt(s) or from bolt(s) to other bolt(s). Apache Stomr provides various stream grouping techniques to let you define how the data should flow in topology. Here are some of the stream grouping techniques:
  1. Shuffle grouping - This type of grouping distributes tuples equally and randomly to all the available bolt tasks.
  2. Fields grouping - This type of grouping makes sure that tuples with same field will go to same bolt task. For example, if the stream is grouped by "word" field, tuples with same "word" value will always go to same bolt task.
  3. Partial Key grouping - The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed.
  4. All grouping - In this grouping techniques, keys are not load balanced and all the stream goes to all of the downstream bolt tasks.
  5. Global grouping - In this grouping techniques, all the stream goes to any one of the downstream bolt task. This needs to be used with caution as this will serialize the processing of tuples that can result into slow processing of tuples.
  6. None grouping - This grouping is just an indicator that you don't care about grouping and would like to go with default. Currently, default grouping is shuffle grouping and may change in future releases so this should be used carefully.
  7. Direct grouping - In this grouping, producer of a tuple decides which task of the consumer will receive the emitted tuple. This is only applicable for streams declared as Direct stream.
  8. Local or shuffle grouping - Since a worker process can have multiple tasks, this grouping will shuffle to the in-process tasks. In case, a worker process is not configured to run multiple tasks, this will act as normal shuffle grouping.

A new stream grouping technique can be developed by implementing the backtype.storm.grouping.CustomStreamGrouping interface.

Topologies
A topology in storm represents the graph of computation and is implemented as DAG (Directed Acyclic Graph) data structure. Each node of this graph contains the data processing logic (bolts) while connecting edges define the flow of data (streams). Storm keeps the topology running forever untill you kill it.

Here is a sample topology called ExclamationTopology that will add two exclamation mark to the words emitted by TestWordSpout using ExclamationBolt:

storm.starter.ExclamationTopology
package storm.starter;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;

public class ExclamationTopology {

  /**
   @param args
   */
  public static void main(String[] argsthrows Exception {
    TopologyBuilder builder = new TopologyBuilder();

      builder.setSpout("word"new TestWordSpout()10);
      builder.setBolt("exclaim1"new ExclamationBolt()3).shuffleGrouping("word");
      builder.setBolt("exclaim2"new ExclamationBolt()2).shuffleGrouping("exclaim1");

      Config conf = new Config();
      StormSubmitter.submitTopology("ExclamationTopology", conf, builder.createTopology());
  }

}



Once you are done writing spouts, bolts and topology, you need to build a jar file containing all of these. Here is the command that you can use to run the exclamation topology in Apache Storm:

storm.starter.ExclamationTopology
$storm jar exclamation-topology-jar-file.jar storm.starter.ExclamationTopology

Introduction to Apache Storm

This article introduces you to Apache Storm, a real-time distributed processing / computing framework for big data, by providing details of its technical architecture along with the use cases it could be utilized in.

Abstract
---------------------------
Apache Storm, in simple terms, is a distributed framework for real time processing of Big Data like Apache Hadoop is a distributed framework for batch processing. Apache Storm works on task parallelism principle where in the same code is executed on multiple nodes with different input data.
Apache Storm does not have any state managing capabilities. It instead utilizes Apache ZooKeeper to manage its cluster state such as message acknowledgements, processing status etc. This enables Storm to start right from where it left even after the restart.
Since Storm's master node (called Nimbus) is a Thrift service, one can create and submit processing logic graph (called topology) in any programming language. Moreover, It is scalable, fault-tolerant and guarantees that input data will be processed.

Technical Architecture
--------------------------------------------------------
Here is the architecture diagram depicting the technical architecture of Apache Storm -



There are following two types of nodes services shown in above diagram -
  1. Nimbus Service on Master Node - Nimbus is a daemon that runs on the master node of Storm cluster. It is responsible for distributing the code among the worker nodes, assigning input data sets to machines for processing and monitoring for failures.Nimbus service is an Apache Thrift service enabling you to submit the code in any programming language. This way, you can always utilize the language that you are proficient in, without the need of learning a new language to utilize Apache Storm.
    Nimbus service relies on Apache ZooKeeper service to monitor the message processing tasks as all the worker nodes update their tasks status in Apache ZooKeeper service.
  2. Supervisor Service on Worker Node - All the workers nodes in Storm cluster run a daemon called Supervisor. Supervisor service receives the work assigned to a machine by Nimbus service. Supervisor manages worker processes to complete the tasks assigned by Nimbus. Each of these worker processes executes a subset of topology that we will talk about next.
And here are the important high level components that we have in each Supervisor node.
  1. Topology - Topology, in simple terms, is a graph of computation. Each node in a topology contains processing logic, and links between nodes indicate how data should be passed around between nodes. A Topology typically runs distributively on multiple workers processes on multiple worker nodes.
  2. Spout - A Topology starts with a spout, source of streams. A stream is made of unbounded sequence of tuples. A spout may read tuples off a messaging framework and emit them as stream of messages or it may connect to twitter API and emit a stream of tweets.In the above technical architecture diagram, a topology is shown with two spouts (source of streams).
  3. Bolt - A Bolt represents a node in a topology. It defines smallest processing logic within a topology. Output of a bolt can be fed into another bolt as input in a topology.In the above technical architecture diagram, a topology is shown with five bolts to process the data coming from two spouts.

Benefits

--------------------------
Apache Storm comes with the following benefits to provide your application with a robust real time processing engine -
  1. ScalableStorm scales to massive numbers of messages per second. To scale a topology, all you have to do is add machines and increase the parallelism settings of the topology. As an example of Storm's scale, one of Storm's initial applications processed 1,000,000 messages per second on a 10 node cluster, including hundreds of database calls per second as part of the topology. Storm's usage of Zookeeper for cluster coordination makes it scale to much larger cluster sizes.
  2. Guarantees no data lossA realtime system must have strong guarantees about data being successfully processed. A system that drops data has a very limited set of use cases. Storm guarantees that every message will be processed, and this is in direct contrast with other systems like S4.
  3. Extremely robust Unlike systems like Hadoop, which are notorious for being difficult to manage, Storm clusters just work. It is an explicit goal of the Storm project to make the user experience of managing Storm clusters as painless as possible.
  4. Fault-tolerantIf there are faults during execution of your computation, Storm will reassign tasks as necessary. Storm makes sure that a computation can run forever (or until you kill the computation).
  5. Programming language agnostic Robust and scalable realtime processing shouldn't be limited to a single platform. Storm topologies and processing components can be defined in any language, making Storm accessible to nearly anyone.
Possible Use Cases
Apache Storm, being a real-time processing framework, is suitable for extremely broad set of use cases some of which are as follows -
  • Stream processing - Processing messages and updating databases
  • Continuous computation - Doing a continuous query on data streams and streaming the results into clients
  • Distributed RPC - Parallelizing an intense query like a search query on the fly
  • Real-time analytics
  • Online machine learning
You can find the information about which all companies are using Apache Storm here
References
Related Posts Plugin for WordPress, Blogger...