Question

I have a hard time understanding how to provide values to storm since i am a newbie to storm.

I started with the starter kit. I went through the TestWordSpout and in that the following code provides new values

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

So i see it's taking one word at a time _collector.emit(new Values(word));

How i can provide a collection of words directly.Is this possible?

TestWordSpout.java

What I mean when nextTuple is called a new words is selected at random from the list and emitted. The random list may look like this after certain time interval

@100ms: nathan
@200ms: golda
@300ms: golda
@400ms: jackson
@500ms: mike
@600ms: nathan
@700ms: bertels

What if i already have a collection of this list and just feed it to storm.

Was it helpful?

Solution

"Values" type accept any kind of objects and any number.

So you can simply send a List for instance from the execute method of a Bolt or from the nextTuple method of a Spout:

List<String> words = new ArrayList<>();
words.add("one word");
words.add("another word");
_collector.emit(new Values(words));

You can add a new Field too, just be sure to declare it in declareOutputFields method

_collector.emit(new Values(words, "a new field value!");

And in your declareOutputFields method

@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("collection", "newField"));
}

You can get the fields in the next Bolt in the topology from the tuple object given by the execute method:

List<String> collection = (List<String>) tuple.getValueByField("collection");
String newFieldValue = tuple.getStringByField("newField");

OTHER TIPS

Storm is designed and built to process the continuous stream of data. Please see Rationale for the Storm. It's very unlikely that input data is feed into the storm cluster. Generally, the input data to storm is either from the JMS queues, Apache Kafka or twitter feeds etc. I would think, you would like to pass few configurations. In that case, the following would apply.

Considering the Storm design purpose, very limited configuration details can be passed to Storm such as the RDMBS connection details (Oracle/DB2/MySQL etc), JMS provider details(IBM MQ/RabbitMQ etc) or Apache Kafka details/Hbase etc.

For your particular question or providing the configuration details for the above products, there are three ways that I could think

1.Set the configuration details on the instance of the Spout or Bolt

For eg: Declare the instance variables and assign the values as part of the Spout/Bolt constructor as below

    public class TestWordSpout extends BaseRichSpout {
         List<String> listOfValues;

   public TestWordSpout(List<String> listOfValues) {
       this.listOfValues=listOfValues;
   }

}

On the topology submission class, create an instance of Spout with the list of values

        List<String> listOfValues=new ArrayList<String>();
        listOfValues.add("nathan");
        listOfValues.add("golda");
        listOfValues.add("mike");

        builder.setSpout("word", new TestWordSpout(listOfValues), 3);

These values are available as instance variables in the nextTuple() method

Please look at the Storm integrations at Storm contrib on the configurations set for RDBMS/Kafka etc as above

2.Set the configurations in the getComponentConfiguration(). This method is used to override the topology configurations, however, you could pass in few details as below

 @Override
    public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> ret = new HashMap<String, Object>();
        if(!_isDistributed) {
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
             List<String> listOfValues=new ArrayList<String>();
             listOfValues.add("nathan");
             listOfValues.add("golda");
             listOfValues.add("mike");
             ret.put("listOfValues", listOfValues);
        }
        return ret;
    }    

and the configuration details are available in the open() or prepare() method of Spout/Bolt respectively.

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        this.listOfValues=(List<String>)conf.get("listOfValues");        
       }

3.Declare the configurations in the property file and jar it as part of the jar file that would be submitted to the Storm cluster. The Nimbus node copies the jar file to the worker nodes and makes it available to executor thread. The open()/prepare() method can read the property file and assign to instance variable.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top