由于我是 Storm 的新手,我很难理解如何为 Storm 提供值。

我从入门套件开始。我经历了 TestWordSpout 并且以下代码提供了新值

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

所以我看到它一次只取一个词 _collector.emit(new Values(word));

我如何直接提供单词集合。这可能吗?

TestWordSpout.java

我的意思是,当 nextTuple 被调用时,新单词是从列表中随机选择并发出的。一定时间间隔后,随机列表可能看起来像这样

@100ms: nathan
@200ms: golda
@300ms: golda
@400ms: jackson
@500ms: mike
@600ms: nathan
@700ms: bertels

如果我已经收集了这个列表并将其提供给风暴怎么办?

有帮助吗?

解决方案

“值”类型接受任何类型的对象和任何数字。

因此,您可以简单地从 Bolt 的执行方法或 Spout 的 nextTuple 方法发送一个列表:

List<String> words = new ArrayList<>();
words.add("one word");
words.add("another word");
_collector.emit(new Values(words));

您也可以添加一个新字段,只需确保在declareOutputFields方法中声明它

_collector.emit(new Values(words, "a new field value!");

在你的declareOutputFields方法中

@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("collection", "newField"));
}

您可以从execute方法给出的元组对象中获取拓扑中下一个Bolt中的字段:

List<String> collection = (List<String>) tuple.getValueByField("collection");
String newFieldValue = tuple.getStringByField("newField");

其他提示

Storm 的设计和构建是为了处理连续的数据流。请参见 风暴的理由. 。输入数据不太可能被输入到 Storm 集群中。一般来说,storm 的输入数据来自 JMS 队列、Apache Kafka 或 Twitter feed 等。我想,您想传递一些配置。在这种情况下,将适用以下内容。

考虑到 Storm 的设计目的,可以将非常有限的配置详细信息传递给 Storm,例如 RDMBS 连接详细信息(Oracle/DB2/MySQL 等)、JMS 提供程序详细信息(IBM MQ/RabbitMQ 等)或 Apache Kafka 详细信息/Hbase 等。

对于您的特定问题或提供上述产品的配置详细信息,我可以想到三种方法

1.在Spout或Bolt实例上设置配置详细信息

例如:声明实例变量并分配值作为 Spout/Bolt 构造函数的一部分,如下所示

    public class TestWordSpout extends BaseRichSpout {
         List<String> listOfValues;

   public TestWordSpout(List<String> listOfValues) {
       this.listOfValues=listOfValues;
   }

}

在拓扑提交类上,使用值列表创建 Spout 实例

        List<String> listOfValues=new ArrayList<String>();
        listOfValues.add("nathan");
        listOfValues.add("golda");
        listOfValues.add("mike");

        builder.setSpout("word", new TestWordSpout(listOfValues), 3);

这些值可用作实例变量 nextTuple() 方法

请查看 Storm 集成: 风暴贡献 上面为 RDBMS/Kafka 等设置的配置

2.设置配置 getComponentConfiguration(). 。此方法用于覆盖拓扑配置,但是,您可以传递一些详细信息,如下所示

 @Override
    public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> ret = new HashMap<String, Object>();
        if(!_isDistributed) {
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
             List<String> listOfValues=new ArrayList<String>();
             listOfValues.add("nathan");
             listOfValues.add("golda");
             listOfValues.add("mike");
             ret.put("listOfValues", listOfValues);
        }
        return ret;
    }    

并且配置详细信息可在 open() or prepare() 分别使用 Spout/Bolt 的方法。

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        this.listOfValues=(List<String>)conf.get("listOfValues");        
       }

3.在属性文件中声明配置,并将其作为jar文件的一部分进行打包,提交到Storm集群。Nimbus 节点将 jar 文件复制到工作节点并使其可供执行器线程使用。open()/prepare()方法可以读取属性文件并分配给实例变量。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top