18 September 2016

Concepts about Storm you need to know [Part 2]

There are some concepts that are always good to know when you are tearing your hair over the working of Storm.


Yaml correctness:

Storm's config files use yaml to define properties, so if you get an error like this when modifying zookeeper servers:
 Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.storm.config$read_storm_config.invoke(config.clj:78)
    at org.apache.storm.command.list$_main.invoke(list.clj:22)
    at clojure.lang.AFn.applyToHelper(AFn.java:152)
    at clojure.lang.AFn.applyTo(AFn.java:144)
    at org.apache.storm.command.list.main(Unknown Source)
Caused by: while scanning a simple key
 in 'reader', line 2, column 1:
    storm.zookeeper.port:2181
    ^
could not found expected ':'
 in 'reader', line 3, column 1:
    nimbus.host: "localhost"
    ^
caused when
storm.zookeeper.servers: - "127.0.0.1"
nimbus.seeds: ["127.0.0.1"]

 
It's because you didn't conform to the yaml syntax. You can validate it at http://www.yamllint.com/ or any other yaml validator website.


Logviewer:
When viewing Storm results from the Storm UI (especially from localhost), you are allowed to click on the port numbers (which are hyperlinks) and you'll be able to see the contents of the log files to which storm logs output.

This works only if you have started the logviewer. This is basically just a storm process you have to start.
Goto the storm bin folder and type ./storm logviewer


Storm's slots:
Storm, by default provides 4 "slots" on each node that you run it on. Why only 4 ports?
A slot is used by a worker. A worker is a JVM. So each JVM would require a clump of heap memory of its own. So a default of 4 slots would use 4*x amount of memory, where x is the memory used by a worker JVM.
Now obviously if you declare more than 4 ports, it'll take up that much more memory. The problem with taking up too much memory, is that your topologies will suddenly crash with a GC overhead limit exceeded exception and the spout or bolt will get re-started constantly. As I understand, you'd be better off with increasing the number of servers or RAM on the existing server, if you want to have many workers/topologies.


Setting the amount of memory used by a topology:
From what I know, you can figure out how much memory your application needs and allocate the memory as shown below.
The below code allocates 2GiB of memory for each worker of the topology.

Config stormConfig = new Config(); stormConfig.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");


Fields grouping does not get overwhelmed with data:

We would think that the way Storm keeps track of unique fields for fields grouping is by having a HashMap of the fields and bolt Id's. That's not how it works.

This is the internal Storm code for fields grouping:

public List chooseTasks(int taskId, List values) {
int targetTaskIndex = Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % numTasks;
return Collections.singletonList(targetTasks.get(targetTaskIndex));
}


TupleUtils.listHashCode leads to
public static int listHashCode(List alist) {
  if (alist == null) {
      return 1;
  } else {
      return Arrays.deepHashCode(alist.toArray());
  }
}


So the function is independent of which Spout emits it. What matters is the field name based on which the fields grouping is done.



More help:


Understanding Fields Grouping in Apache Storm [Part 3]

Fields grouping can be confusing to understand from the examples given on the internet because they all have some strings in the declaration fields which just don't make any sense, given that we are supposed to send tuples based on some "fields".

So basically the concept is this:
  • In shuffle grouping, you send any tuple you want from a spout and Storm will decide which of the numerous bolt tasks it should reach.
  • In fields grouping, you can send the same tuple from the spout and also specify a unique ID based on which Storm can for example, send all tuples with id1 always to bolt3 or send tuples with id4 always to bolt 5 or id6 always to bolt5. Multiple id's can go to the same bolt, but they will continue being sent to the same bolt.
Most examples will show you a tuple as a string and the string itself will be the unique ID based on which fields grouping is done.

But  that does not have to be the case.

You can even send other values as the fields of the tuple and specify one of those fields as the unique ID that Storm should use for doing fields grouping. 

A "field" by definition is:  "Data encapsulated in a class or object".

So when you specify fields for a fields grouping, you are first declaring to Storm that you are creating a Tuple object which has fields which you want to create, and you give the fields some names you choose:

ofd.declare(new Fields(FieldName1, FieldName2, FieldName3));

This is pretty much (although not exactly) like declaring an object like this (pseudocode):

object Tuple {
  {FieldName1: Array[0]};
  {FieldName2: Array[1]};

  {FieldName3: Array[2]};
}

And then when emitting, you specify the values that you want to store in those array positions:
collector.emit(new Values(tup, randNum, spoutId), tup);

object Tuple {
  {FieldName1: Array[0] = new MyData();};
  {FieldName2: Array[1] = 50.46334};

  {FieldName3: Array[2] = 67}; 
}

You can give any String name to FieldName1, 2 and 3. You can also store any object in those array positions.

Now if you want to tell Storm to use one of those field names to do the Tuple grouping, you simply specify the FieldName when declaring the topology. Done like this:
 .fieldsGrouping(spoutName, new Fields(FieldName2));


That's it. It's that simple. Follow the text in red below.

BasicStorm.java
package com.sdint.basicstorm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class BasicStorm {

 public static void main(String[] cmdArgs) {

 Config config = new Config();
 config.put(Config.TOPOLOGY_DEBUG, false);
 config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
 config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second

 final String spoutName = DataSpout.class.getSimpleName();
 final String boltName = ProcessingBolt.class.getSimpleName();
 final Integer numberOfSpouts = 1;
 final Integer numberOfBolts = 5;
 String someRandomNumber = "someRandomNumber";
 String theSpoutId = "theSpoutId";

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(spoutName, new DataSpout(), numberOfSpouts)
 .setNumTasks(numberOfSpouts);
 builder.setBolt(boltName, new ProcessingBolt(), numberOfBolts)
 .setNumTasks(numberOfBolts)
 .fieldsGrouping(spoutName, new Fields(someRandomNumber));

 LocalCluster localCluster = new LocalCluster();
 localCluster.submitTopology("BasicStorm", config, builder.createTopology());

 System.out.println("\n\n\nTopology submitted\n\n\n");
 StopWatch.pause(60*3);//pause for x seconds during which the emitting of tuples will happen

 //localCluster.killTopology("BasicStorm");
 localCluster.shutdown();
 }
}


DataSpout.java
package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSpout extends BaseRichSpout {
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private transient Integer spoutId;   
    private transient Integer numberOfEmits;
   
    private final Logger logger = LoggerFactory.getLogger(DataSpout.class);
   
    @Override
    public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
        this.context = tc;
        this.collector = soc;
        this.spoutId = context.getThisTaskId();
        this.numberOfEmits = 0;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        String someRandomNumber = "someRandomNumber";//these fields strings have to be exactly the same wherever you specify them
        String theSpoutId = "theSpoutId";
        String aTupleName = "aTupleName";
        ofd.declare(new Fields(aTupleName, someRandomNumber, theSpoutId));
    }

    @Override
    public void nextTuple() {
       
        if (++numberOfEmits < 200) {
            Double randNum = Math.floor(Math.random() * 10);
            SomeTuple tup = new SomeTuple();
            collector.emit(new Values(tup, randNum, spoutId), tup);//(tuple, uniqueID for FieldsGrouping, spoutID), unique messageId
            logger.info("Emitting {}",randNum);
        }
        else {logger.info("NO MORE EMITS");}
       
        StopWatch.pauseMilli(300);
    }
   
    @Override
    public void ack(Object msgId) {
    }   
   
    @Override
    public void fail(Object msgId) {
    }
   
}


ProcessingBolt.java
package com.sdint.basicstorm;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessingBolt extends BaseRichBolt {

    private OutputCollector collector;
    private transient Integer boltId;
    private final Logger logger = LoggerFactory.getLogger(ProcessingBolt.class);
    private HashMap counter;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

    @Override
    public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
        collector = oc;
        boltId = tc.getThisTaskId();
        counter = new HashMap<>();
    }

    @Override
    public void execute(Tuple tuple) {

        //From the spout, Values(randNum, spoutId) were sent, so we receive it here
        SomeTuple tup = (SomeTuple) tuple.getValue(0);//the first value in the fields
        Double num = (Double) tuple.getValue(1);//the second value sent in the fields
        Integer spoutId = (Integer) tuple.getValue(2);//the third value sent in the fields
        //Similarly, you could have sent more values from the spout and received it as tuple.getValue(3) or (4) etc.
        logger.info("Bolt {} received {} from spout {}", boltId, num, spoutId);
        CountTheNumberOfSimilarNumbersReceived(num);
        DisplayCounts();

        collector.ack(tuple);
    }

    private void CountTheNumberOfSimilarNumbersReceived(final Double num) {
        if (counter.containsKey(num)) {
            Integer existingCount = counter.get(num);
            counter.put(num, ++existingCount);//increment the count for this number
        } else {
            counter.put(num, 0);
        }
    }

    private void DisplayCounts() {
        logger.info("\n\n\n\nBolt {} has", boltId);
        for (Map.Entry oneVal : counter.entrySet()) {
            logger.info("{} number of {}", oneVal.getValue(), oneVal.getKey());
        }
        logger.info("\n\n\n");
    }

}


  • The tuple is the entire bunch of data being emitted. ie: The object that is emitted in the spout (new Values(tup, randNum, spoutId)). That's the same object that reaches the bolt (public void execute(Tuple tuple)).
  • The fields are the objects that formed the tuple (SomeTuple object, random number and spout Id)
  • Grouping of tuples (sending them to specific bolts) is done based only on the random number field because that's what you specified in the line  .fieldsGrouping(spoutName, new Fields(someRandomNumber));

So if random number is 1.0, the tuple might be sent to bolt 5.
If random number is 2.0, the tuple might be sent to bolt 3.
If random number is 2.0 again, the tuple will be sent to bolt 3.

That's all there is to it. If you want, you can even have the random number as the tuple. It could also be a String instead of a number. Any object you want. You can also re-order the fields in the tuple. Just make sure you also have the same order in the declareOutputFields function.

You also need to know that fields grouping does not overwhelm Storm with data.


More on Apache Storm:

17 September 2016

How gravity based water filters purify water


Disclaimer: I am not an expert on chemicals or water filters. What is mentioned in this particular post about chemicals is a perception based on information sourced from the internet.


If you use an RO (Reverse Osmosis) filter and are familiar with the process of osmosis,  then all you need to know is that RO is osmosis in the reverse direction which happens when water pressure is high. The heavy metals and salt in the water is retained within the confines of the membrane and pure water (including chlorine) gets across the membrane.
But gravity based filters aren't like that. They depend on just gravity to go through a few filters and viola! somehow, the water is purified. On asking the company how bacteria and viruses are killed, you'll get at best an ambiguous answer. In-fact, an investigation into a certain company's 1 crore viruses kill claim was shown to be exaggerated.


So the first thing you learn about modern water filter companies is:
Don't trust their marketing. Ask extremely detailed questions. If you don't get proper answers, don't buy the product.

Backstory:
My old water filter stopped functioning, and I had to get a new temporary water filter quickly. Looking at the reviews on Amazon, a certain brand's Advanced gravity based water filter seemed ok at that time, because I didn't need to filter borewell water (which would've had metallic impurities). I just needed it to filter the chlorinated corporation water. Ordered it via the company helpline and it was promptly delivered.

After a few days of drinking it, I got an uneasy feeling in my stomach and lost my hunger. It was like the stomach needed time to repair damage it sustained. Oddly, even the construction workers we gave the water to, refused to drink it after a few days.
On reporting it, a company technician came to inspect it, said it might be a problem with the carbon polisher and surprisingly, he himself refused to drink the water saying he's had throat problems after drinking the water at various customers houses. He seemed familiar with the problem. 
After replacing the carbon polisher, the problem continued. This time, a different technician arrived, and he refused to drink the water too. Said he was familiar with the problem, replaced the 'germ-kill' kit and asked me to try using the filter again. Problem continued and I had to ask them to take back the product. They took it back after refunding half the price.


What a gravity-based purifier may rely on:

1. A Germkill processor: This is essentially nothing but bleaching powder. The same white powder used to sanitize swimming pools, is used to disinfect your drinking water. This powder basically brings chlorine into the water which kills microorganisms. Problem is, if your water is already chlorinated by your municipal corporation, then the water purifier is just going to add more chlorine and that can be bad for your stomach.
 
In the case of this product, the germkill processor had this chemical in a little container which would slowly dissolve in the water and when it got over, the red piece of plastic at the top would have moved to the bottom and would be visible to the user as a sign that the germkill kit needed replacement.





2. An advanced microfiber mesh: It's nothing but a porous material which allows water through and does not allow dirt through. Nothing advanced about it. Gets dirty quickly, and the technician advised cleaning it every week. So honestly, it does the job, but well...clean it every week is a reality.



3. Carbon polisher: This is a carbon-based fabric that's supposed to remove pesticides, parasites (really? So the germkill kit doesn't work?) and chlorine, but that also means you need to change your carbon cartridge often because it can't adsorb (not absorb) chemicals forever.



4. Microcharged membrane: It's just another fancy name which claims to remove harmful parasites (really? So the germkill kit and the carbon polisher didn't work yet?) It's just a hollow cylinder with a clump of white wires at the bottom. The technician told me it was to give the water a sweet taste.

outside
inside

So that's the process. Chlorinate the water, remove dirt, remove chlorine, add a sweet taste. This is claimed to be "better than boiling", but it'd take a very gullible person to believe that.

The most visible and alarming indicator was that their own technicians were refusing to drink the "purified" water. I wrote to the company requesting them to do further checks and if necessary, initiate a product recall in the interest of public safety.

As for you, if you get municipal corporation water, boiling it would be a much more reliable way to get rid of bacteria. Although boiling doesn't get rid of all pathogens, it does reduce it to a safe level. An acquaintance who worked for a water purifier company informed me that it is the old water filters that used to be much better and more reliable than the modern filters. The modern ones bank on getting money out of you by having annual maintenance contracts and having components like the germkill kit and carbon polisher which need frequent replacement.

We need a better solution than relying on unethical corporations for a basic need such as clean drinking water.



10 September 2016