Using Storm Spout Tuples for triggering execution of further computations at certain times

StackOverflow https://stackoverflow.com/questions/23658046

  •  22-07-2023
  •  | 
  •  

Вопрос

I want to connect some Storm Bolts to a TriggerSpout, which emits Tuples let's say every 3 - 4 hours or at certain times of the day. When the connected Bolts recieve a tuple from this TriggerSpout, further computation on aggregated data (which comes from another kafka spout) should be initiated. (In this case feature extraction for feeding a subsequent MLBolt)

Now, can I somehow make a Spout with its nextTuple() method expose this behavior? Is this the way to go or can anyone suggest a better approach to do that? It's kind of a global clock for the cluster.

regards'n'thanks

Это было полезно?

Решение

Instead of using a TriggerSpout, you could add ticks into your bolt. Add the following to your bolt implementation:

import backtype.storm.Constants;

@Override
public Map<String, Object> getComponentConfiguration() {
    Config conf = new Config();
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 15);
    return conf;
}

public static boolean isTickTuple(Tuple tuple) {
    String sourceComponent = tuple.getSourceComponent();
    String sourceStreamId = tuple.getSourceStreamId();
    return sourceComponent.equals(Constants.SYSTEM_COMPONENT_ID)
            && sourceStreamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
}

In your execute() method, you first check whether the input is a tick tuple, then proceed with your logic.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top