Monday, April 18, 2016

A simple Apache Storm tutorial [Part1]

Apache Storm is actually well documented. Problem is, you won't understand any of it until you actually try out some code (even if it's in the form of a nice explanation by Chandan Prakash), and there's a dearth of simple code available on the internet. NRecursions comes to your rescue :-)

To run this program, you can either do it with Gradle (which I've used mainly so that the jar dependency management would automatically be handled by Gradle) or you could simply create a normal Java project and manually add the necessary jar's. The jar's you'll need are:
  • asm-5.0.3.jar
  • bson4jackson-2.7.0.jar
  • clojure-1.7.0.jar
  • disruptor-3.3.2.jar
  • kryo-3.0.3.jar
  • log4j-api-2.1.jar
  • log4j-core-2.1.jar
  • log4j-over-slf4j-1.6.6.jar
  • log4j-slf4j-impl-2.1.jar
  • logback-classic-1.1.3.jar
  • logback-core-1.1.3.jar
  • logback-core-1.1.3.jar
  • minlog-1.3.0.jar
  • objeneiss-2.1.jar
  • reflectasm-1.10.1.jar
  • servlet-api-2.5.jar
  • slf4j-api-1.7.12.jar
  • storm-core-1.0.0.jar

Create a Gradle project named "BasicStorm" and create a source package named "com.sdint.basicstorm".

Within that package, create

package com.sdint.basicstorm;

import org.apache.storm.Config;

import java.util.concurrent.TimeUnit;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class BasicStorm {

 public static void main(String[] cmdArgs) {

 Config config = new Config();
 //config.put(Config.TOPOLOGY_DEBUG, false);
 config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
 config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout("myDataSpout", new DataSpout());

 builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");

 LocalCluster localCluster = new LocalCluster();
 localCluster.submitTopology("BasicStorm", config, builder.createTopology());

 System.out.println("\n\n\nTopology submitted\n\n\n");
 pause(120);//pause for 120 seconds during which the emitting of tuples will happen


 public static void pause(int timeToPause_InSeconds) {
    try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
    catch (InterruptedException e) {System.out.println(e.getCause());}



package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSpout extends BaseRichSpout {
 private TopologyContext context;
 private SpoutOutputCollector collector;

 private final Logger logger = LoggerFactory.getLogger(DataSpout.class);

 public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
 this.context = tc;
 this.collector = soc;

 System.out.println("\n\n\nopen of DataSpout\n\n\n");

 public DataSpout() {
 System.out.println("\n\n\nDataSpout constructor called\n\n\n");

 public void declareOutputFields(OutputFieldsDeclarer ofd) {
 System.out.println("\n\n\ndeclareoutputfields of DataSpout\n\n\n");

 ofd.declare(new Fields("line"));

 public void nextTuple() {
 System.out.println("\n\n\nnexttuple of DataSpout\n\n\n");

 Long newTupleValue =System.currentTimeMillis() % 1000;

 this.collector.emit(new Values(newTupleValue), newTupleValue);
 System.out.println("\n\n\nEmitting "+newTupleValue+"\n\n\n");

 public void pause(int timeToPause_InSeconds) {
    try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
    catch (InterruptedException e) {System.out.println(e.getCause());}}


package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ProcessingBolt extends BaseRichBolt {
 private OutputCollector collector;

 public void declareOutputFields(OutputFieldsDeclarer ofd) {
 System.out.println("\n\n\ndeclareOutputFields of ProcessingBolt called\n\n\n");

 public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
 System.out.println("\n\n\nprepare of ProcessingBolt called\n\n\n");
 collector = oc;

 public void execute(Tuple tuple) {
 System.out.println("\n\n\nTuple received in ProcessingBolt:"+tuple+" \n\n\n");


Your build.gradle file would look like this (if you choose to create a Gradle project. You won't need this if you're creating a simple Java project):

apply plugin: 'java'
apply plugin: 'eclipse'

defaultTasks 'jar'

jar {
 from {
        (configurations.runtime).collect {
            it.isDirectory() ? it : zipTree(it)
    manifest {
        attributes 'Main-Class': 'com.sdint.basicstorm.BasicStorm'

sourceCompatibility = '1.8'
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'

if (!hasProperty('mainClass')) {
    ext.mainClass = 'com.sdint.basicstorm.BasicStorm'

repositories {

dependencies { 
    //---apache storm
    compile 'org.apache.storm:storm-core:1.0.0'  //compile 'org.apache.storm:storm-core:0.10.0'
    compile "org.slf4j:slf4j-api:1.7.12"
    compile 'ch.qos.logback:logback-classic:1.1.3'
    compile 'ch.qos.logback:logback-core:1.1.3'   
    testCompile group: 'junit', name: 'junit', version: '4.10'


Hope you've already read a little about how the Spout's and Bolt's work in Storm.

This is what happens in BasicStorm:

  • The moment a Spout or Bolt is instantiated, its declareOutputFields function gets called. For our simple program we don't need to know what it is, so let's ignore it for now.
  • When the topology is submitted to Storm, the open function gets called for Spouts, and this function gets called only once for a Spout instance.
  • For Bolts, the equivalent of open is the prepare function which gets called. You can use open and prepare to do initializations, declarations etc. for your program.
  • After submitting the topology, we pause for a while in main() (pause(120);) so that Storm would get time to run the topology. This is the time during which Storm calls nextTuple() of the Spout, and when the Spout emits a Tuple, the Tuple is sent to the Bolt (because in main() we configured the Bolt to receive Tuples from the Spout. See the line builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");). 
  • When the Bolt receives the value, the execute() function of the Bolt is called.
  • BasicStorm is designed for a simple task. DataSpout emits a Long value (it's a Tuple), ProcessingBolt receives it and ProcessingBolt acknowledges (the collector.ack(tuple); line) that it has received the Long value and that the data processing for the tuple is complete.
  • When DataSpout receives the acknowledgement, it calls nextTuple() again and another tuple gets emitted.
  • This process keeps going on for the 120 seconds we have paused the main() thread for. After that, the topology shuts down.

Try tweaking the values here and there to find out how the program works. Try substuting some other value in place of the Long for the Tuple. Try substituting it with a class object. Remember that if you substitute it like that, you'll have to extend that class with Serializable.

One more thing to try out:
Try commenting out collector.ack(tuple); in You'll basically not be telling the Spout that the tuple has been received by the Bolt. So after some time, the Spout will emit the tuple again. The interval of time the Spout waits for an acknowledgement (ack) is normally 30 seconds, but if you scroll up, you'll see that in main() we had set the time to 10 seconds (config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);), so the Spout will wait just 10s before emitting the tuple again.

Continued in Part 2

No comments: