質問

私は嵐のための初心者であるので、嵐に価値を提供する方法を理解しています。

スターターキットから始めました。私はTestWordSpoutを通過し、次のコードが新しい値を提供するという点で

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}
.

だから私はそれが時折1語で1つの単語を取っているのを見ます

私は直接の単語のコレクションを提供することができる方法です。これは可能ですか?

testwordspout.java

NEXTTUPLEと呼ばれているときは、リストからランダムに新しい単語が選択されます。ランダムリストは、特定の時間間隔

の後にこのように見えるかもしれません
@100ms: nathan
@200ms: golda
@300ms: golda
@400ms: jackson
@500ms: mike
@600ms: nathan
@700ms: bertels
.

すでにこのリストのコレクションを持っていて、暴風雨に餌をやるとどうなりますか。

役に立ちましたか?

解決

"値"タイプ任意の種類のオブジェクトと任意の数を受け入れます。

だから、インスタンスの実行方法からボルトの実行方法またはスパウトのNextTupleメソッドからリストを送信することができます。

List<String> words = new ArrayList<>();
words.add("one word");
words.add("another word");
_collector.emit(new Values(words));
.

新しいフィールドを追加することもできます.DeclareOutputFieldsメソッド

で必ず宣言してください。
_collector.emit(new Values(words, "a new field value!");
.

とあなたのDecleRAreUtputFieldsメソッド

@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("collection", "newField"));
}
.

EXECUTEメソッドで指定されたタプルオブジェクトからトポロジ内の次のボルトのフィールドを取得できます。

List<String> collection = (List<String>) tuple.getValueByField("collection");
String newFieldValue = tuple.getStringByField("newField");
.

他のヒント

Stormは、連続的なデータストリームを処理するように設計され構築されています。 rationale を参照してください。入力データがStormクラスタにフィードされているのは非常に低いです。一般的に、Stormへの入力データは、JMSキュー、Apache Kafka、Twitterフィードなどからのものです。私は考えています、あなたはいくつかの構成を渡したいと思います。その場合は、次のようになります。

嵐の設計目的を考慮すると、非常に限られた構成の詳細はRDMBS接続の詳細(Oracle / DB2 / MySQLなど)、JMSプロバイダの詳細(IBM MQ / RABBITMQなど)またはApache Kafka Details / HBaseなどのストームに渡すことができます。 。

あなたの特定の質問や上記の製品の構成の詳細を提供するために、私が考えることができる3つの方法があります

1.スパウトまたはボルトのインスタンスの構成の詳細を設定する

たとえば:インスタンス変数を宣言し、以下のようにスパウト/ボルトコンストラクターの一部として値を割り当てます。
    public class TestWordSpout extends BaseRichSpout {
         List<String> listOfValues;

   public TestWordSpout(List<String> listOfValues) {
       this.listOfValues=listOfValues;
   }

}
.

トポロジ送信クラスで、値のリストを持つスパウトのインスタンスを作成します。

        List<String> listOfValues=new ArrayList<String>();
        listOfValues.add("nathan");
        listOfValues.add("golda");
        listOfValues.add("mike");

        builder.setSpout("word", new TestWordSpout(listOfValues), 3);
.

これらの値は、nextTuple()メソッド

のインスタンス変数として使用可能です。

rdbms / kafkaなどの設定で嵐の統合を見てください。上記のように

2. getComponentConfiguration()の設定を設定します。このメソッドはトポロジ構成をオーバーライドするために使用されますが、詳細は以下のように

を渡すことができます。
 @Override
    public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> ret = new HashMap<String, Object>();
        if(!_isDistributed) {
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
             List<String> listOfValues=new ArrayList<String>();
             listOfValues.add("nathan");
             listOfValues.add("golda");
             listOfValues.add("mike");
             ret.put("listOfValues", listOfValues);
        }
        return ret;
    }    
.

と構成の詳細は、それぞれ吐き出し/ボルトのopen() or prepare()メソッドで利用可能です。

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        this.listOfValues=(List<String>)conf.get("listOfValues");        
       }
.

3.プロパティファイルの設定とjarファイルの一部として、Stormクラスタに送信されるJARファイルの一部として設定します。 NIMBUSノードはJARファイルをワーカーノードにコピーし、executorスレッドで利用できるようにします。 open()/ prepare()メソッドはプロパティファイルを読み取り、インスタンス変数に割り当てることができます。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top