Sunday 11 October 2015

Spouts, Bolts, Streams and Toplogies in Apache Storm

This tutorials provides the details of various Apache Storm primitives such as Spouts, Bolts, Stream groupings and topologies.

Abstract
Apache Storm is a free and open source distributed real-time computation system that is scalable, reliable and easy to setup/maintain. Apache Storm cluster is made up of two types of processes - Nimbus and Supervisor. Nimbus is a process running on master node that is responsible for tracking the progress of data processing while Supervisor process runs on worker nodes and is responsible for executing the data processing logic. 

For a better and detailed knowledge of Apache Storm cluster, you are recommended to go through the tutorial - Introduction to Apache Storm.

In this tutorial, we will be discussing the various Apache Storm primitives that are as follows:
  • Spouts
  • Bolts
  • Streams
  • Topologies

Spouts
Spouts represent the source of data in Storm. You can write spouts to read data from data sources such as database, distributed file systems, messaging frameworks etc. Spouts can broadly be classified into following -
  • Reliable - These spouts have the capability to replay the tuples (a unit of data in data stream). This helps applications achieve 'at least once message processing' semantic as in case of failures, tuples can be replayed and processed again. Spouts for fetching the data from messaging frameworks are generally reliable as these frameworks provide the mechanism to replay the messages.
  • Unreliable - These spouts don't have the capability to replay the tuples. Once a tuple is emitted, it can not be replayed irrespective of whether it was processed successfully or not. This type of spouts follow 'at most once message processing' semantic.

Below diagram shows Spout hierarchy along with some important spouts that come bundled with Apache Storm. BaseRichSpout is an important class and all your Java spouts should extend it.



As we can see that there are some useful spouts that are ready to be used. E.g. kafkaSpout could be used to read messages off kafka topics.


Here is the sample Spout emitting random words -

backtype.storm.testing.TestWordSpout
package backtype.storm.testing;

import backtype.storm.Config;
import backtype.storm.topology.OutputFieldsDeclarer;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class TestWordSpout extends BaseRichSpout {
    public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
    boolean _isDistributed;
    SpoutOutputCollector _collector;

    public TestWordSpout() {
        this(true);
    }

    public TestWordSpout(boolean isDistributed) {
        _isDistributed = isDistributed;
    }
        
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
    }
    
    public void close() {
        
    }
        
    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan""mike""jackson""golda""bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }
    
    public void ack(Object msgId) {

    }

    public void fail(Object msgId) {
        
    }
    
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        if(!_isDistributed) {
            Map<String, Object> ret = new HashMap<String, Object>();
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        else {
            return null;
        }
    }    
}

Some of the important methods in above spout definition are open (used for doing initializing work), nextTuple (called by Storm for getting the tuple) and declareOutputFields (used for associating the field names for emitted tuples).

Bolts
Bolts represent the processing logic unit in Storm. One can utilize bolts to do any kind of processing such as filtering, aggregating, joining, interacting with data stores, talking to external systems etc.

Similar to Spouts, bolts can also emit tuples (data messages) for the subsequent bolts to process. Additionally, bolts are responsible to acknowledge the processing of tuples after they are done processing. Spouts consider a tuple fully processed when they have received acknowledgement for that tuple as well as for all the typles emitted by bolts as part of that original tuple processing.

E.g. a spout emits a tuple t1 that goes to bolt b1 for processing. Bolt b1 processes t1, emits another tuple t2 and acknowledges the processing of tuple t1. At this point, even though tuple t1 has been acknowledgement, spout will not consider this tuple fully processed as tuple 2 emitted as part of its processing is still not acknowledged. Tuple t2 goes to bolt2 for processing and gets acknowledged. As a result, tuple t1 is considered fully processed by spout.

Below diagram shows Bolt hierarchy of Apache Storm.


There are following two important classes that Java bolts can extend from:
  • BaseBasicBolt - BaseBasicBolt does the acknowledgement of tuples automatically for you and hence you just need to focus on processing logic.
  • BaseRichBolt - On the other hand, BaseRichBolt requires you to acknowledge tuples explicitly. This can he helpful in scenarios where acknowledgements are not needed for tuples (tuples emitted without a message id).

Here is the sample Bolt adding exclamation mark to words emitted by Spout and further emitting a tuple for downstream bolts -

storm.starter.ExclamationBolt
package storm.starter;

import java.util.Collections;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class ExclamationBolt extends BaseBasicBolt{

  private static final long serialVersionUID = 1L;

  @Override
  public void execute(Tuple input, BasicOutputCollector collector) {
    collector.emit(Collections.singletonList((Object)(input.getString(0"!!!")));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

}


Since this bolt is extending BaseBasicBolt, it is not explicitly acknowledging the tuples in its execute method.

Streams
Streams represent the unbounded sequences of tuples where tuple is a unit of data. Stream of tuples flows from spout to bolt(s) or from bolt(s) to other bolt(s). Apache Stomr provides various stream grouping techniques to let you define how the data should flow in topology. Here are some of the stream grouping techniques:
  1. Shuffle grouping - This type of grouping distributes tuples equally and randomly to all the available bolt tasks.
  2. Fields grouping - This type of grouping makes sure that tuples with same field will go to same bolt task. For example, if the stream is grouped by "word" field, tuples with same "word" value will always go to same bolt task.
  3. Partial Key grouping - The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed.
  4. All grouping - In this grouping techniques, keys are not load balanced and all the stream goes to all of the downstream bolt tasks.
  5. Global grouping - In this grouping techniques, all the stream goes to any one of the downstream bolt task. This needs to be used with caution as this will serialize the processing of tuples that can result into slow processing of tuples.
  6. None grouping - This grouping is just an indicator that you don't care about grouping and would like to go with default. Currently, default grouping is shuffle grouping and may change in future releases so this should be used carefully.
  7. Direct grouping - In this grouping, producer of a tuple decides which task of the consumer will receive the emitted tuple. This is only applicable for streams declared as Direct stream.
  8. Local or shuffle grouping - Since a worker process can have multiple tasks, this grouping will shuffle to the in-process tasks. In case, a worker process is not configured to run multiple tasks, this will act as normal shuffle grouping.

A new stream grouping technique can be developed by implementing the backtype.storm.grouping.CustomStreamGrouping interface.

Topologies
A topology in storm represents the graph of computation and is implemented as DAG (Directed Acyclic Graph) data structure. Each node of this graph contains the data processing logic (bolts) while connecting edges define the flow of data (streams). Storm keeps the topology running forever untill you kill it.

Here is a sample topology called ExclamationTopology that will add two exclamation mark to the words emitted by TestWordSpout using ExclamationBolt:

storm.starter.ExclamationTopology
package storm.starter;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.TopologyBuilder;

public class ExclamationTopology {

  /**
   @param args
   */
  public static void main(String[] argsthrows Exception {
    TopologyBuilder builder = new TopologyBuilder();

      builder.setSpout("word"new TestWordSpout()10);
      builder.setBolt("exclaim1"new ExclamationBolt()3).shuffleGrouping("word");
      builder.setBolt("exclaim2"new ExclamationBolt()2).shuffleGrouping("exclaim1");

      Config conf = new Config();
      StormSubmitter.submitTopology("ExclamationTopology", conf, builder.createTopology());
  }

}



Once you are done writing spouts, bolts and topology, you need to build a jar file containing all of these. Here is the command that you can use to run the exclamation topology in Apache Storm:

storm.starter.ExclamationTopology
$storm jar exclamation-topology-jar-file.jar storm.starter.ExclamationTopology

No comments:

Post a Comment

Related Posts Plugin for WordPress, Blogger...