27 April 2016

18 April 2016

A simple Apache Storm tutorial [Part 2: Implementing failsafes]


Continued from part1


If you really want to understand what the Value class is, what the Tuple class is etc., the best place to look, is not the tutorials on the internet. Look at the actual Storm source code.
It's available here: https://github.com/apache/storm
Go into the "storm-core/src/jvm/org/apache/storm" folder and have a look at those Java files. The code is very simple to understand and I promise you, it will be an enlightening experience.

Now, onto the ack and fail aspects of Storm.

Given below, is the exact same program as Part 1 of this tutorial. The added sections and sections that need your attention are highlighted.


BasicStorm.java:

package com.sdint.basicstorm;

import org.apache.storm.Config;

import java.util.concurrent.TimeUnit;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class BasicStorm {

    public static void main(String[] cmdArgs) {
       
        Config config = new Config();
        //config.put(Config.TOPOLOGY_DEBUG, false);
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second
       
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("myDataSpout", new DataSpout());
       
        builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");
       
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("BasicStorm", config, builder.createTopology());
       
        System.out.println("\n\n\nTopology submitted\n\n\n");
        pause(120);//pause for 120 seconds during which the emitting of tuples will happen
       
        //localCluster.killTopology("BasicStorm");
        localCluster.shutdown();
    }//main


    public static void pause(int timeToPause_InSeconds) {
        try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
        catch (InterruptedException e) {System.out.println(e.getCause());}
    }
 }//class


DataSpout.java:

package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSpout extends BaseRichSpout {
    private TopologyContext context;
    private SpoutOutputCollector collector;
   
    //---logger
    private final Logger logger = LoggerFactory.getLogger(DataSpout.class);
   
    private boolean tupleAck = true;
    private Long oldTupleValue;
   
   
    @Override
    public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
        this.context = tc;
        this.collector = soc;
       
        System.out.println("\n\n\nopen of DataSpout\n\n\n");      
    }
   
    public DataSpout() {
        System.out.println("\n\n\nDataSpout ctor called\n\n\n");
    }//ctor

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        System.out.println("\n\n\ndeclareoutputfields of DataSpout\n\n\n");
       
        ofd.declare(new Fields("line"));
    }

    @Override
    public void nextTuple() {
        System.out.println("\n\n\nnexttuple of DataSpout\n\n\n");
       
        Long newTupleValue;
        if (tupleAck) {
            newTupleValue = System.currentTimeMillis() % 1000;
            oldTupleValue = newTupleValue;
        }
        else {newTupleValue = oldTupleValue;}

       
        this.collector.emit(new Values(newTupleValue), newTupleValue);
        System.out.println("\n\n\nEmitting "+newTupleValue+"\n\n\n");
        pause(1);
    }
   
    @Override
    public void ack(Object msgId) {
        System.out.println("\n\n\nAck received for DataSpout"+msgId+"\n\n\n");
        tupleAck = true;
    }   
   
    @Override
    public void fail(Object msgId) {
        System.out.println("\n\n\nFailed tuple msgID: "+msgId+"\n\n\n");
        //replay logic should be here
        tupleAck = false;
    }

 

    public void pause(int timeToPause_InSeconds) {
        try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
        catch (InterruptedException e) {System.out.println(e.getCause());}
    }
    
}//class



ProcessingBolt.java:

package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ProcessingBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        System.out.println("\n\n\ndeclareOutputFields of ProcessingBolt called\n\n\n");
    }

    @Override
    public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
        System.out.println("\n\n\nprepare of ProcessingBolt called\n\n\n");
        collector = oc;
    }

    @Override
    public void execute(Tuple tuple) {
        System.out.println("\n\n\nTuple received in ProcessingBolt:"+tuple+" \n\n\n");
        collector.ack(tuple);
    }

   
}



Notice that this time when you run the program, the ack function in the Spout will get called whenever the Bolt executes the collector.ack(tuple); statement.

But suppose you comment out collector.ack(tuple);, then after a certain time period (normally 30 seconds, but in our program we made it 10 seconds), the fail function will get called.

This is how the Spout (and we) know whether a tuple has been received by the Bolt and acknowledged or not. The above program basically uses the System time as a Tuple and in case the Bolt does not acknowledge that it has received the Tuple, then the Spout sends the same old Tuple to the Bolt again.




And before getting into hardcore Storm programming, there is this important thing:

Apache Storm concepts you really need to know.




Say thank you or donate

A simple Apache Storm tutorial [Part1]

Apache Storm is actually well documented. Problem is, you won't understand any of it until you actually try out some code (even if it's in the form of a nice explanation by Chandan Prakash), and there's a dearth of simple code available on the internet. NRecursions comes to your rescue :-)

To run this program, you can either do it with Gradle (which I've used mainly so that the jar dependency management would automatically be handled by Gradle) or you could simply create a normal Java project and manually add the necessary jar's. The jar's you'll need are:
  • asm-5.0.3.jar
  • bson4jackson-2.7.0.jar
  • clojure-1.7.0.jar
  • disruptor-3.3.2.jar
  • kryo-3.0.3.jar
  • log4j-api-2.1.jar
  • log4j-core-2.1.jar
  • log4j-over-slf4j-1.6.6.jar
  • log4j-slf4j-impl-2.1.jar
  • logback-classic-1.1.3.jar
  • logback-core-1.1.3.jar
  • logback-core-1.1.3.jar
  • minlog-1.3.0.jar
  • objeneiss-2.1.jar
  • reflectasm-1.10.1.jar
  • servlet-api-2.5.jar
  • slf4j-api-1.7.12.jar
  • storm-core-1.0.0.jar

Create a Gradle project named "BasicStorm" and create a source package named "com.sdint.basicstorm".

Within that package, create BasicStorm.java
 
package com.sdint.basicstorm;

import org.apache.storm.Config;
import java.util.concurrent.TimeUnit;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class BasicStorm {

 public static void main(String[] cmdArgs) {

 Config config = new Config();
 //config.put(Config.TOPOLOGY_DEBUG, false);
 config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
 config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout("myDataSpout", new DataSpout());

 builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");

 LocalCluster localCluster = new LocalCluster();
 localCluster.submitTopology("BasicStorm", config, builder.createTopology());

 System.out.println("\n\n\nTopology submitted\n\n\n");
 pause(120);//pause for 120 seconds during which the emitting of tuples will happen

 //localCluster.killTopology("BasicStorm");
 localCluster.shutdown();
 }//main

 public static void pause(int timeToPause_InSeconds) {
    try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);}
    catch (InterruptedException e) {System.out.println(e.getCause());}
}

}//class


and DataSpout.java
 
package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSpout extends BaseRichSpout {
 private TopologyContext context;
 private SpoutOutputCollector collector;

 //---logger
 private final Logger logger = LoggerFactory.getLogger(DataSpout.class);

 @Override
 public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
 this.context = tc;
 this.collector = soc;

 System.out.println("\n\n\nopen of DataSpout\n\n\n");
 }

 public DataSpout() {
 System.out.println("\n\n\nDataSpout constructor called\n\n\n");
 }//ctor

 @Override
 public void declareOutputFields(OutputFieldsDeclarer ofd) {
 System.out.println("\n\n\ndeclareoutputfields of DataSpout\n\n\n");

 ofd.declare(new Fields("line"));
 }

 @Override
 public void nextTuple() {
 System.out.println("\n\n\nnexttuple of DataSpout\n\n\n");

 Long newTupleValue =System.currentTimeMillis() % 1000;

 this.collector.emit(new Values(newTupleValue), newTupleValue);
 System.out.println("\n\n\nEmitting "+newTupleValue+"\n\n\n");
 pause(1);
 }

 public void pause(int timeToPause_InSeconds) {
    try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);}
    catch (InterruptedException e) {System.out.println(e.getCause());}}
}//class


and ProcessingBolt.java
 
package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ProcessingBolt extends BaseRichBolt {
 private OutputCollector collector;

 @Override
 public void declareOutputFields(OutputFieldsDeclarer ofd) {
 System.out.println("\n\n\ndeclareOutputFields of ProcessingBolt called\n\n\n");
 }

 @Override
 public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
 System.out.println("\n\n\nprepare of ProcessingBolt called\n\n\n");
 collector = oc;
 }

 @Override
 public void execute(Tuple tuple) {
 System.out.println("\n\n\nTuple received in ProcessingBolt:"+tuple+" \n\n\n");
 collector.ack(tuple);
 }

}


Your build.gradle file would look like this (if you choose to create a Gradle project. You won't need this if you're creating a simple Java project):

apply plugin: 'java'
apply plugin: 'eclipse'

defaultTasks 'jar'

jar {
 from {
        (configurations.runtime).collect {
            it.isDirectory() ? it : zipTree(it)
        }
    }   
    manifest {
        attributes 'Main-Class': 'com.sdint.basicstorm.BasicStorm'
    }
}

sourceCompatibility = '1.8'
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'

if (!hasProperty('mainClass')) {
    ext.mainClass = 'com.sdint.basicstorm.BasicStorm'
}

repositories {
    mavenCentral()
}

dependencies { 
    //---apache storm
    compile 'org.apache.storm:storm-core:1.0.0'  //compile 'org.apache.storm:storm-core:0.10.0'
    //---logging
    compile "org.slf4j:slf4j-api:1.7.12"
    compile 'ch.qos.logback:logback-classic:1.1.3'
    compile 'ch.qos.logback:logback-core:1.1.3'   
       
    testCompile group: 'junit', name: 'junit', version: '4.10'

}



Hope you've already read a little about how the Spout's and Bolt's work in Storm.

This is what happens in BasicStorm:

  • The moment a Spout or Bolt is instantiated, its declareOutputFields function gets called. For our simple program we don't need to know what it is, so let's ignore it for now.
  • When the topology is submitted to Storm, the open function gets called for Spouts, and this function gets called only once for a Spout instance.
  • For Bolts, the equivalent of open is the prepare function which gets called. You can use open and prepare to do initializations, declarations etc. for your program.
  • After submitting the topology, we pause for a while in main() (pause(120);) so that Storm would get time to run the topology. This is the time during which Storm calls nextTuple() of the Spout, and when the Spout emits a Tuple, the Tuple is sent to the Bolt (because in main() we configured the Bolt to receive Tuples from the Spout. See the line builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");). 
  • When the Bolt receives the value, the execute() function of the Bolt is called.
  • BasicStorm is designed for a simple task. DataSpout emits a Long value (it's a Tuple), ProcessingBolt receives it and ProcessingBolt acknowledges (the collector.ack(tuple); line) that it has received the Long value and that the data processing for the tuple is complete.
  • When DataSpout receives the acknowledgement, it calls nextTuple() again and another tuple gets emitted.
  • This process keeps going on for the 120 seconds we have paused the main() thread for. After that, the topology shuts down.

Try tweaking the values here and there to find out how the program works. Try substituting some other value in place of the Long for the Tuple. Try substituting it with a class object. Remember that if you substitute it like that, you'll have to extend that class with Serializable.

One more thing to try out:
Try commenting out collector.ack(tuple); in ProcessingBolt.java. You'll basically not be telling the Spout that the tuple has been received by the Bolt. So after some time, the Spout will emit the tuple again. The interval of time the Spout waits for an acknowledgement (ack) is normally 30 seconds, but if you scroll up, you'll see that in main() we had set the time to 10 seconds (config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);), so the Spout will wait just ten seconds before emitting the tuple again.



Continued in Part 2



Say thank you or donate

06 April 2016

A comfortable Git branching model

Git has a very flexible approach toward branching, and workflows. The best I've seen yet.
Novices would probably be happy using the popular branching model Vincent Driessen has shown on his webpage: http://nvie.com/posts/a-successful-git-branching-model/



I've learnt from and liked Vincent's work, but during the course of working with the branching model, it has been more comfortable to use a slightly different model, especially when collaborating with other developers.



What I propose to be different...

...is simply that instead using the master branch to maintain the "stable" version of the code, it is more comfortable to have the master branch as the branch that the entire team collaborates on

A separate branch named "stable" (see; even the name is more appropriate than 'master') is created, which is also pushed to the remote/blessed repository, and that's the branch that keeps your stable version of the code that is ready for release.

It's not only about the comfort that the master branch affords you for collaborative development: ie: simply using "git pull" instead of "git pull origin development" etc.

It's also the fact that you'll have to do fewer merges between branches, which could end up looking like this:


The above screenshot is from GitUp. One of the best Git clients I've seen. Once you do a merge, you can actually undo it too (only available for MacOS though. When I wanted to port it to Linux myself, I saw the developers post that he's used too much of Objective C code for the graphics to be able to port it).