Sunday, September 18, 2016

Understanding Fields Grouping in Apache Storm [Part 3]

Fields grouping can be confusing to understand from the examples given on the internet because they all have some strings in the declaration fields which just don't make any sense, given that we are supposed to send tuples based on some "fields".

So basically the concept is this:
  • In shuffle grouping, you send any tuple you want from a spout and Storm will decide which of the numerous bolt tasks it should reach.
  • In fields grouping, you can send the same tuple from the spout and also specify a unique ID based on which Storm can for example, send all tuples with id1 always to bolt3 or send tuples with id4 always to bolt 5 or id6 always to bolt5. Multiple id's can go to the same bolt, but they will continue being sent to the same bolt.
Most examples will show you a tuple as a string and the string itself will be the unique ID based on which fields grouping is done.

But  that does not have to be the case.

You can even send other values as the fields of the tuple and specify one of those fields as the unique ID that Storm should use for doing fields grouping. 

A "field" by definition is:  "Data encapsulated in a class or object".

So when you specify fields for a fields grouping, you are first declaring to Storm that you are creating a Tuple object which has fields which you want to create, and you give the fields some names you choose:

ofd.declare(new Fields(FieldName1, FieldName2, FieldName3));

This is pretty much (although not exactly) like declaring an object like this (pseudocode):

object Tuple {
  {FieldName1: Array[0]};
  {FieldName2: Array[1]};

  {FieldName3: Array[2]};
}

And then when emitting, you specify the values that you want to store in those array positions:
collector.emit(new Values(tup, randNum, spoutId), tup);

object Tuple {
  {FieldName1: Array[0] = new MyData();};
  {FieldName2: Array[1] = 50.46334};

  {FieldName3: Array[2] = 67}; 
}

You can give any String name to FieldName1, 2 and 3. You can also store any object in those array positions.

Now if you want to tell Storm to use one of those field names to do the Tuple grouping, you simply specify the FieldName when declaring the topology. Done like this:
 .fieldsGrouping(spoutName, new Fields(FieldName2));


That's it. It's that simple. Follow the text in red below.

BasicStorm.java
package com.sdint.basicstorm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class BasicStorm {

 public static void main(String[] cmdArgs) {

 Config config = new Config();
 config.put(Config.TOPOLOGY_DEBUG, false);
 config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
 config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second

 final String spoutName = DataSpout.class.getSimpleName();
 final String boltName = ProcessingBolt.class.getSimpleName();
 final Integer numberOfSpouts = 1;
 final Integer numberOfBolts = 5;
 String someRandomNumber = "someRandomNumber";
 String theSpoutId = "theSpoutId";

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(spoutName, new DataSpout(), numberOfSpouts)
 .setNumTasks(numberOfSpouts);
 builder.setBolt(boltName, new ProcessingBolt(), numberOfBolts)
 .setNumTasks(numberOfBolts)
 .fieldsGrouping(spoutName, new Fields(someRandomNumber));

 LocalCluster localCluster = new LocalCluster();
 localCluster.submitTopology("BasicStorm", config, builder.createTopology());

 System.out.println("\n\n\nTopology submitted\n\n\n");
 StopWatch.pause(60*3);//pause for x seconds during which the emitting of tuples will happen

 //localCluster.killTopology("BasicStorm");
 localCluster.shutdown();
 }
}


DataSpout.java
package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSpout extends BaseRichSpout {
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private transient Integer spoutId;   
    private transient Integer numberOfEmits;
   
    private final Logger logger = LoggerFactory.getLogger(DataSpout.class);
   
    @Override
    public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
        this.context = tc;
        this.collector = soc;
        this.spoutId = context.getThisTaskId();
        this.numberOfEmits = 0;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        String someRandomNumber = "someRandomNumber";//these fields strings have to be exactly the same wherever you specify them
        String theSpoutId = "theSpoutId";
        String aTupleName = "aTupleName";
        ofd.declare(new Fields(aTupleName, someRandomNumber, theSpoutId));
    }

    @Override
    public void nextTuple() {
       
        if (++numberOfEmits < 200) {
            Double randNum = Math.floor(Math.random() * 10);
            SomeTuple tup = new SomeTuple();
            collector.emit(new Values(tup, randNum, spoutId), tup);//(tuple, uniqueID for FieldsGrouping, spoutID), unique messageId
            logger.info("Emitting {}",randNum);
        }
        else {logger.info("NO MORE EMITS");}
       
        StopWatch.pauseMilli(300);
    }
   
    @Override
    public void ack(Object msgId) {
    }   
   
    @Override
    public void fail(Object msgId) {
    }
   
}


ProcessingBolt.java
package com.sdint.basicstorm;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessingBolt extends BaseRichBolt {

    private OutputCollector collector;
    private transient Integer boltId;
    private final Logger logger = LoggerFactory.getLogger(ProcessingBolt.class);
    private HashMap counter;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

    @Override
    public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
        collector = oc;
        boltId = tc.getThisTaskId();
        counter = new HashMap<>();
    }

    @Override
    public void execute(Tuple tuple) {

        //From the spout, Values(randNum, spoutId) were sent, so we receive it here
        SomeTuple tup = (SomeTuple) tuple.getValue(0);//the first value in the fields
        Double num = (Double) tuple.getValue(1);//the second value sent in the fields
        Integer spoutId = (Integer) tuple.getValue(2);//the third value sent in the fields
        //Similarly, you could have sent more values from the spout and received it as tuple.getValue(3) or (4) etc.
        logger.info("Bolt {} received {} from spout {}", boltId, num, spoutId);
        CountTheNumberOfSimilarNumbersReceived(num);
        DisplayCounts();

        collector.ack(tuple);
    }

    private void CountTheNumberOfSimilarNumbersReceived(final Double num) {
        if (counter.containsKey(num)) {
            Integer existingCount = counter.get(num);
            counter.put(num, ++existingCount);//increment the count for this number
        } else {
            counter.put(num, 0);
        }
    }

    private void DisplayCounts() {
        logger.info("\n\n\n\nBolt {} has", boltId);
        for (Map.Entry oneVal : counter.entrySet()) {
            logger.info("{} number of {}", oneVal.getValue(), oneVal.getKey());
        }
        logger.info("\n\n\n");
    }

}


  • The tuple is the entire bunch of data being emitted. ie: The object that is emitted in the spout (new Values(tup, randNum, spoutId)). That's the same object that reaches the bolt (public void execute(Tuple tuple)).
  • The fields are the objects that formed the tuple (SomeTuple object, random number and spout Id)
  • Grouping of tuples (sending them to specific bolts) is done based only on the random number field because that's what you specified in the line  .fieldsGrouping(spoutName, new Fields(someRandomNumber));

So if random number is 1.0, the tuple might be sent to bolt 5.
If random number is 2.0, the tuple might be sent to bolt 3.
If random number is 2.0 again, the tuple will be sent to bolt 3.

That's all there is to it. If you want, you can even have the random number as the tuple. It could also be a String instead of a number. Any object you want. You can also re-order the fields in the tuple. Just make sure you also have the same order in the declareOutputFields function.

You also need to know that fields grouping does not overwhelm Storm with data.


More on Apache Storm:

No comments: