Sunday, September 18, 2016

Concepts about Storm you need to know [Part 2]

There are some concepts that are always good to know when you are tearing your hair over the working of Storm.

Yaml correctness:

Storm's config files use yaml to define properties, so if you get an error like this when modifying zookeeper servers:

Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.storm.config$read_storm_config.invoke(config.clj:78)
    at org.apache.storm.command.list$_main.invoke(list.clj:22)
    at clojure.lang.AFn.applyToHelper(
    at clojure.lang.AFn.applyTo(
    at org.apache.storm.command.list.main(Unknown Source)
Caused by: while scanning a simple key
 in 'reader', line 2, column 1:
could not found expected ':'
 in 'reader', line 3, column 1: "localhost"
caused when
storm.zookeeper.servers: - ""
nimbus.seeds: [""]
It's because you didn't conform to the yaml syntax. You can validate it at or any other yaml validator website.

When viewing Storm results from the Storm UI (especially from localhost), you are allowed to click on the port numbers (which are hyperlinks) and you'll be able to see the contents of the log files to which storm logs output.

This works only if you have started the logviewer. This is basically just a storm process you have to start.
Goto the storm bin folder and type ./storm logviewer

Storm's slots:
Storm, by default provides 4 "slots" on each node that you run it on. Why only 4 ports?
A slot is used by a worker. A worker is a JVM. So each JVM would require a clump of heap memory of its own. So a default of 4 slots would use 4*x amount of memory, where x is the memory used by a worker JVM.
Now obviously if you declare more than 4 ports, it'll take up that much more memory. The problem with taking up too much memory, is that your topologies will suddenly crash with a GC overhead limit exceeded exception and the spout or bolt will get re-started constantly. As I understand, you'd be better off with increasing the number of servers or RAM on the existing server, if you want to have many workers/topologies.

Setting the amount of memory used by a topology:
From what I know, you can figure out how much memory your application needs and allocate the memory as shown below.
The below code allocates 2GiB of memory for each worker of the topology.

Config stormConfig = new Config(); stormConfig.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");

Fields grouping does not get overwhelmed with data:

We would think that the way Storm keeps track of unique fields for fields grouping is by having a HashMap of the fields and bolt Id's. That's not how it works.

This is the internal Storm code for fields grouping:

public List chooseTasks(int taskId, List values) {
int targetTaskIndex = Math.abs(TupleUtils.listHashCode(, values))) % numTasks;
return Collections.singletonList(targetTasks.get(targetTaskIndex));

TupleUtils.listHashCode leads to
public static int listHashCode(List alist) {
  if (alist == null) {
      return 1;
  } else {
      return Arrays.deepHashCode(alist.toArray());

So the function is independent of which Spout emits it. What matters is the field name based on which the fields grouping is done.

More help:

No comments: