18 April 2016
A simple Apache Storm tutorial [Part 2: Implementing failsafes]
Continued from part1
If you really want to understand what the Value class is, what the Tuple class is etc., the best place to look, is not the tutorials on the internet. Look at the actual Storm source code.
It's available here: https://github.com/apache/storm
Go into the "storm-core/src/jvm/org/apache/storm" folder and have a look at those Java files. The code is very simple to understand and I promise you, it will be an enlightening experience.
Now, onto the ack and fail aspects of Storm.
Given below, is the exact same program as Part 1 of this tutorial. The added sections and sections that need your attention are highlighted.
BasicStorm.java:
package com.sdint.basicstorm;
import org.apache.storm.Config;
import java.util.concurrent.TimeUnit;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
public class BasicStorm {
public static void main(String[] cmdArgs) {
Config config = new Config();
//config.put(Config.TOPOLOGY_DEBUG, false);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("myDataSpout", new DataSpout());
builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("BasicStorm", config, builder.createTopology());
System.out.println("\n\n\nTopology submitted\n\n\n");
pause(120);//pause for 120 seconds during which the emitting of tuples will happen
//localCluster.killTopology("BasicStorm");
localCluster.shutdown();
}//main
public static void pause(int timeToPause_InSeconds) {
try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);}
catch (InterruptedException e) {System.out.println(e.getCause());}
}
}//class
DataSpout.java:
package com.sdint.basicstorm;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataSpout extends BaseRichSpout {
private TopologyContext context;
private SpoutOutputCollector collector;
//---logger
private final Logger logger = LoggerFactory.getLogger(DataSpout.class);
private boolean tupleAck = true;
private Long oldTupleValue;
@Override
public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
this.context = tc;
this.collector = soc;
System.out.println("\n\n\nopen of DataSpout\n\n\n");
}
public DataSpout() {
System.out.println("\n\n\nDataSpout ctor called\n\n\n");
}//ctor
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
System.out.println("\n\n\ndeclareoutputfields of DataSpout\n\n\n");
ofd.declare(new Fields("line"));
}
@Override
public void nextTuple() {
System.out.println("\n\n\nnexttuple of DataSpout\n\n\n");
Long newTupleValue;
if (tupleAck) {
newTupleValue = System.currentTimeMillis() % 1000;
oldTupleValue = newTupleValue;
}
else {newTupleValue = oldTupleValue;}
this.collector.emit(new Values(newTupleValue), newTupleValue);
System.out.println("\n\n\nEmitting "+newTupleValue+"\n\n\n");
pause(1);
}
@Override
public void ack(Object msgId) {
System.out.println("\n\n\nAck received for DataSpout"+msgId+"\n\n\n");
tupleAck = true;
}
@Override
public void fail(Object msgId) {
System.out.println("\n\n\nFailed tuple msgID: "+msgId+"\n\n\n");
//replay logic should be here
tupleAck = false;
}
public void pause(int timeToPause_InSeconds) {
try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);}
catch (InterruptedException e) {System.out.println(e.getCause());}
}
}//class
ProcessingBolt.java:
package com.sdint.basicstorm;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class ProcessingBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
System.out.println("\n\n\ndeclareOutputFields of ProcessingBolt called\n\n\n");
}
@Override
public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
System.out.println("\n\n\nprepare of ProcessingBolt called\n\n\n");
collector = oc;
}
@Override
public void execute(Tuple tuple) {
System.out.println("\n\n\nTuple received in ProcessingBolt:"+tuple+" \n\n\n");
collector.ack(tuple);
}
}
Notice that this time when you run the program, the ack function in the Spout will get called whenever the Bolt executes the collector.ack(tuple); statement.
But suppose you comment out collector.ack(tuple);, then after a certain time period (normally 30 seconds, but in our program we made it 10 seconds), the fail function will get called.
This is how the Spout (and we) know whether a tuple has been received by the Bolt and acknowledged or not. The above program basically uses the System time as a Tuple and in case the Bolt does not acknowledge that it has received the Tuple, then the Spout sends the same old Tuple to the Bolt again.
And before getting into hardcore Storm programming, there is this important thing:
Apache Storm concepts you really need to know.
A simple Apache Storm tutorial [Part1]
Apache Storm is actually well documented. Problem is, you won't understand any of it until you actually try out some code (even if it's in the form of a nice explanation by Chandan Prakash), and there's a dearth of simple code available on the internet. NRecursions comes to your rescue :-)
To run this program, you can either do it with Gradle (which I've used mainly so that the jar dependency management would automatically be handled by Gradle) or you could simply create a normal Java project and manually add the necessary jar's. The jar's you'll need are:
Create a Gradle project named "BasicStorm" and create a source package named "com.sdint.basicstorm".
Within that package, create BasicStorm.java
To run this program, you can either do it with Gradle (which I've used mainly so that the jar dependency management would automatically be handled by Gradle) or you could simply create a normal Java project and manually add the necessary jar's. The jar's you'll need are:
- asm-5.0.3.jar
- bson4jackson-2.7.0.jar
- clojure-1.7.0.jar
- disruptor-3.3.2.jar
- kryo-3.0.3.jar
- log4j-api-2.1.jar
- log4j-core-2.1.jar
- log4j-over-slf4j-1.6.6.jar
- log4j-slf4j-impl-2.1.jar
- logback-classic-1.1.3.jar
- logback-core-1.1.3.jar
- logback-core-1.1.3.jar
- minlog-1.3.0.jar
- objeneiss-2.1.jar
- reflectasm-1.10.1.jar
- servlet-api-2.5.jar
- slf4j-api-1.7.12.jar
- storm-core-1.0.0.jar
Create a Gradle project named "BasicStorm" and create a source package named "com.sdint.basicstorm".
Within that package, create BasicStorm.java
package com.sdint.basicstorm;
import org.apache.storm.Config;
import java.util.concurrent.TimeUnit;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
public class BasicStorm {
public static void main(String[] cmdArgs) {
Config config = new Config();
//config.put(Config.TOPOLOGY_DEBUG, false);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("myDataSpout", new DataSpout());
builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("BasicStorm", config, builder.createTopology());
System.out.println("\n\n\nTopology submitted\n\n\n");
pause(120);//pause for 120 seconds during which the emitting of tuples will happen
//localCluster.killTopology("BasicStorm");
localCluster.shutdown();
}//main
public static void pause(int timeToPause_InSeconds) {
try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);}
catch (InterruptedException e) {System.out.println(e.getCause());}
}
}//class
and DataSpout.java
import org.apache.storm.Config;
import java.util.concurrent.TimeUnit;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
public class BasicStorm {
public static void main(String[] cmdArgs) {
Config config = new Config();
//config.put(Config.TOPOLOGY_DEBUG, false);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("myDataSpout", new DataSpout());
builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("BasicStorm", config, builder.createTopology());
System.out.println("\n\n\nTopology submitted\n\n\n");
pause(120);//pause for 120 seconds during which the emitting of tuples will happen
//localCluster.killTopology("BasicStorm");
localCluster.shutdown();
}//main
public static void pause(int timeToPause_InSeconds) {
try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);}
catch (InterruptedException e) {System.out.println(e.getCause());}
}
}//class
and DataSpout.java
package com.sdint.basicstorm;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataSpout extends BaseRichSpout {
private TopologyContext context;
private SpoutOutputCollector collector;
//---logger
private final Logger logger = LoggerFactory.getLogger(DataSpout.class);
@Override
public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
this.context = tc;
this.collector = soc;
System.out.println("\n\n\nopen of DataSpout\n\n\n");
}
public DataSpout() {
System.out.println("\n\n\nDataSpout constructor called\n\n\n");
}//ctor
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
System.out.println("\n\n\ndeclareoutputfields of DataSpout\n\n\n");
ofd.declare(new Fields("line"));
}
@Override
public void nextTuple() {
System.out.println("\n\n\nnexttuple of DataSpout\n\n\n");
Long newTupleValue =System.currentTimeMillis() % 1000;
this.collector.emit(new Values(newTupleValue), newTupleValue);
System.out.println("\n\n\nEmitting "+newTupleValue+"\n\n\n");
pause(1);
}
public void pause(int timeToPause_InSeconds) {
try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);}
catch (InterruptedException e) {System.out.println(e.getCause());}}
}//class
and ProcessingBolt.java
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DataSpout extends BaseRichSpout {
private TopologyContext context;
private SpoutOutputCollector collector;
//---logger
private final Logger logger = LoggerFactory.getLogger(DataSpout.class);
@Override
public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
this.context = tc;
this.collector = soc;
System.out.println("\n\n\nopen of DataSpout\n\n\n");
}
public DataSpout() {
System.out.println("\n\n\nDataSpout constructor called\n\n\n");
}//ctor
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
System.out.println("\n\n\ndeclareoutputfields of DataSpout\n\n\n");
ofd.declare(new Fields("line"));
}
@Override
public void nextTuple() {
System.out.println("\n\n\nnexttuple of DataSpout\n\n\n");
Long newTupleValue =System.currentTimeMillis() % 1000;
this.collector.emit(new Values(newTupleValue), newTupleValue);
System.out.println("\n\n\nEmitting "+newTupleValue+"\n\n\n");
pause(1);
}
public void pause(int timeToPause_InSeconds) {
try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);}
catch (InterruptedException e) {System.out.println(e.getCause());}}
}//class
and ProcessingBolt.java
package com.sdint.basicstorm;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class ProcessingBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
System.out.println("\n\n\ndeclareOutputFields of ProcessingBolt called\n\n\n");
}
@Override
public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
System.out.println("\n\n\nprepare of ProcessingBolt called\n\n\n");
collector = oc;
}
@Override
public void execute(Tuple tuple) {
System.out.println("\n\n\nTuple received in ProcessingBolt:"+tuple+" \n\n\n");
collector.ack(tuple);
}
}
Your build.gradle file would look like this (if you choose to create a Gradle project. You won't need this if you're creating a simple Java project):
apply plugin: 'java'
apply plugin: 'eclipse'
defaultTasks 'jar'
jar {
from {
(configurations.runtime).collect {
it.isDirectory() ? it : zipTree(it)
}
}
manifest {
attributes 'Main-Class': 'com.sdint.basicstorm.BasicStorm'
}
}
sourceCompatibility = '1.8'
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'
if (!hasProperty('mainClass')) {
ext.mainClass = 'com.sdint.basicstorm.BasicStorm'
}
repositories {
mavenCentral()
}
dependencies {
//---apache storm
compile 'org.apache.storm:storm-core:1.0.0' //compile 'org.apache.storm:storm-core:0.10.0'
//---logging
compile "org.slf4j:slf4j-api:1.7.12"
compile 'ch.qos.logback:logback-classic:1.1.3'
compile 'ch.qos.logback:logback-core:1.1.3'
testCompile group: 'junit', name: 'junit', version: '4.10'
}
Hope you've already read a little about how the Spout's and Bolt's work in Storm.
This is what happens in BasicStorm:
Try tweaking the values here and there to find out how the program works. Try substituting some other value in place of the Long for the Tuple. Try substituting it with a class object. Remember that if you substitute it like that, you'll have to extend that class with Serializable.
One more thing to try out:
Try commenting out collector.ack(tuple); in ProcessingBolt.java. You'll basically not be telling the Spout that the tuple has been received by the Bolt. So after some time, the Spout will emit the tuple again. The interval of time the Spout waits for an acknowledgement (ack) is normally 30 seconds, but if you scroll up, you'll see that in main() we had set the time to 10 seconds (config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);), so the Spout will wait just ten seconds before emitting the tuple again.
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class ProcessingBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
System.out.println("\n\n\ndeclareOutputFields of ProcessingBolt called\n\n\n");
}
@Override
public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
System.out.println("\n\n\nprepare of ProcessingBolt called\n\n\n");
collector = oc;
}
@Override
public void execute(Tuple tuple) {
System.out.println("\n\n\nTuple received in ProcessingBolt:"+tuple+" \n\n\n");
collector.ack(tuple);
}
}
Your build.gradle file would look like this (if you choose to create a Gradle project. You won't need this if you're creating a simple Java project):
apply plugin: 'java'
apply plugin: 'eclipse'
defaultTasks 'jar'
jar {
from {
(configurations.runtime).collect {
it.isDirectory() ? it : zipTree(it)
}
}
manifest {
attributes 'Main-Class': 'com.sdint.basicstorm.BasicStorm'
}
}
sourceCompatibility = '1.8'
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'
if (!hasProperty('mainClass')) {
ext.mainClass = 'com.sdint.basicstorm.BasicStorm'
}
repositories {
mavenCentral()
}
dependencies {
//---apache storm
compile 'org.apache.storm:storm-core:1.0.0' //compile 'org.apache.storm:storm-core:0.10.0'
//---logging
compile "org.slf4j:slf4j-api:1.7.12"
compile 'ch.qos.logback:logback-classic:1.1.3'
compile 'ch.qos.logback:logback-core:1.1.3'
testCompile group: 'junit', name: 'junit', version: '4.10'
}
Hope you've already read a little about how the Spout's and Bolt's work in Storm.
This is what happens in BasicStorm:
- The moment a Spout or Bolt is instantiated, its declareOutputFields function gets called. For our simple program we don't need to know what it is, so let's ignore it for now.
- When the topology is submitted to Storm, the open function gets called for Spouts, and this function gets called only once for a Spout instance.
- For Bolts, the equivalent of open is the prepare function which gets called. You can use open and prepare to do initializations, declarations etc. for your program.
- After submitting the topology, we pause for a while in main() (pause(120);) so that Storm would get time to run the topology. This is the time during which Storm calls nextTuple() of the Spout, and when the Spout emits a Tuple, the Tuple is sent to the Bolt (because in main() we configured the Bolt to receive Tuples from the Spout. See the line builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");).
- When the Bolt receives the value, the execute() function of the Bolt is called.
- BasicStorm is designed for a simple task. DataSpout emits a Long value (it's a Tuple), ProcessingBolt receives it and ProcessingBolt acknowledges (the collector.ack(tuple); line) that it has received the Long value and that the data processing for the tuple is complete.
- When DataSpout receives the acknowledgement, it calls nextTuple() again and another tuple gets emitted.
- This process keeps going on for the 120 seconds we have paused the main() thread for. After that, the topology shuts down.
Try tweaking the values here and there to find out how the program works. Try substituting some other value in place of the Long for the Tuple. Try substituting it with a class object. Remember that if you substitute it like that, you'll have to extend that class with Serializable.
One more thing to try out:
Try commenting out collector.ack(tuple); in ProcessingBolt.java. You'll basically not be telling the Spout that the tuple has been received by the Bolt. So after some time, the Spout will emit the tuple again. The interval of time the Spout waits for an acknowledgement (ack) is normally 30 seconds, but if you scroll up, you'll see that in main() we had set the time to 10 seconds (config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);), so the Spout will wait just ten seconds before emitting the tuple again.
Continued in Part 2
06 April 2016
A comfortable Git branching model
Git has a very flexible approach toward branching, and workflows. The best I've seen yet.
Novices would probably be happy using the popular branching model Vincent Driessen has shown on his webpage: http://nvie.com/posts/a-successful-git-branching-model/
I've learnt from and liked Vincent's work, but during the course of working with the branching model, it has been more comfortable to use a slightly different model, especially when collaborating with other developers.
What I propose to be different...
...is simply that instead using the master branch to maintain the "stable" version of the code, it is more comfortable to have the master branch as the branch that the entire team collaborates on.
A separate branch named "stable" (see; even the name is more appropriate than 'master') is created, which is also pushed to the remote/blessed repository, and that's the branch that keeps your stable version of the code that is ready for release.
It's not only about the comfort that the master branch affords you for collaborative development: ie: simply using "git pull" instead of "git pull origin development" etc.
It's also the fact that you'll have to do fewer merges between branches, which could end up looking like this:
The above screenshot is from GitUp. One of the best Git clients I've seen. Once you do a merge, you can actually undo it too (only available for MacOS though. When I wanted to port it to Linux myself, I saw the developers post that he's used too much of Objective C code for the graphics to be able to port it).
Novices would probably be happy using the popular branching model Vincent Driessen has shown on his webpage: http://nvie.com/posts/a-successful-git-branching-model/
I've learnt from and liked Vincent's work, but during the course of working with the branching model, it has been more comfortable to use a slightly different model, especially when collaborating with other developers.
What I propose to be different...
...is simply that instead using the master branch to maintain the "stable" version of the code, it is more comfortable to have the master branch as the branch that the entire team collaborates on.
A separate branch named "stable" (see; even the name is more appropriate than 'master') is created, which is also pushed to the remote/blessed repository, and that's the branch that keeps your stable version of the code that is ready for release.
It's not only about the comfort that the master branch affords you for collaborative development: ie: simply using "git pull" instead of "git pull origin development" etc.
It's also the fact that you'll have to do fewer merges between branches, which could end up looking like this:
The above screenshot is from GitUp. One of the best Git clients I've seen. Once you do a merge, you can actually undo it too (only available for MacOS though. When I wanted to port it to Linux myself, I saw the developers post that he's used too much of Objective C code for the graphics to be able to port it).
Subscribe to:
Posts (Atom)