Domanda

Ho difficoltà a capire come fornire valori da temperare da quando sono un principiante di tempesta.

Ho iniziato con il kit di avviamento.Ho attraversato il TestWordSpout e in quanto il seguente codice fornisce nuovi valori

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}
.

Così vedo che sta prendendo una parola alla volta _collector.emit(new Values(word));

Come posso fornire una raccolta di parole direttamente. Questo è possibile?

Testwordspout.java

Cosa intendo quando NextTuple è chiamato nuove parole è selezionato a caso dall'elenco ed emesso.L'elenco casuale potrebbe sembrare così dopo un intervallo di tempo

@100ms: nathan
@200ms: golda
@300ms: golda
@400ms: jackson
@500ms: mike
@600ms: nathan
@700ms: bertels
.

Cosa succede se ho già una raccolta di questo elenco e basta darlate a tempesta.

È stato utile?

Soluzione

"Valori" Tipo Accetta qualsiasi tipo di oggetti e qualsiasi numero.

In modo da poter semplicemente inviare un elenco ad esempio dal metodo di esecuzione di un bullone o dal metodo successivo di un beccuccio:

List<String> words = new ArrayList<>();
words.add("one word");
words.add("another word");
_collector.emit(new Values(words));
.

Puoi aggiungere anche un nuovo campo, assicurarsi solo di dichiararlo nel metodo DichiaroOutputfields

_collector.emit(new Values(words, "a new field value!");
.

e nel tuo metodo DichiaratoreOutputfields

@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("collection", "newField"));
}
.

È possibile ottenere i campi nel bullone successivo nella topologia dall'oggetto della tupla fornito dal metodo EXECUTE:

List<String> collection = (List<String>) tuple.getValueByField("collection");
String newFieldValue = tuple.getStringByField("newField");
.

Altri suggerimenti

Storm è progettato e costruito per elaborare il flusso continuo dei dati. Si prega di vedere Razionale per la tempesta . È molto improbabile che i dati di input siano nutriti nel cluster di tempesta. Generalmente, i dati di input da tempestare sono sia dalle code JMS, Apache Kafka o feed Twitter, ecc. Penserei, vorresti passare poche configurazioni. In tal caso, si applicherebbe quanto segue.

Considerando lo scopo della tempesta di progettazione, i dettagli di configurazione molto limitati possono essere passati a Storm come i dettagli della connessione RDmbs (Oracle / DB2 / MySQL, ecc.), Dettagli del provider JMS (IBM MQ / RabbitMQ ecc.) o Apache Kafka Dettagli / HBASE ecc. .

Per la tua particolare domanda o fornendo i dettagli di configurazione per i prodotti di cui sopra, ci sono tre modi in cui potrei pensare

1.Set I dettagli di configurazione sull'istanza del beccuccio o del bullone

Per ad esempio: dichiarare le variabili di istanza e assegnare i valori come parte del costruttore del beccuccio / bullone come sotto

    public class TestWordSpout extends BaseRichSpout {
         List<String> listOfValues;

   public TestWordSpout(List<String> listOfValues) {
       this.listOfValues=listOfValues;
   }

}
.

Nella classe di invio della topologia, creare un'istanza di beccuccio con l'elenco dei valori

        List<String> listOfValues=new ArrayList<String>();
        listOfValues.add("nathan");
        listOfValues.add("golda");
        listOfValues.add("mike");

        builder.setSpout("word", new TestWordSpout(listOfValues), 3);
.

Questi valori sono disponibili come variabili di istanza nel metodo nextTuple()

Si prega di guardare le integrazioni della tempesta a Storm contrib sulle configurazioni impostati per RDBMS / KAFKA ecc Come sopra

2.Set Le configurazioni nel getComponentConfiguration(). Questo metodo è utilizzato per sovrascrivere le configurazioni Topology, tuttavia, è possibile passare in pochi dettagli come sotto

 @Override
    public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> ret = new HashMap<String, Object>();
        if(!_isDistributed) {
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
             List<String> listOfValues=new ArrayList<String>();
             listOfValues.add("nathan");
             listOfValues.add("golda");
             listOfValues.add("mike");
             ret.put("listOfValues", listOfValues);
        }
        return ret;
    }    
.

E i dettagli di configurazione sono disponibili nel metodo open() or prepare() rispettivamente di Becco / Bolt.

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        this.listOfValues=(List<String>)conf.get("listOfValues");        
       }
.

3.Declare le configurazioni nel file di proprietà e nel barattolo come parte del file JAR che verrà inviato al cluster Storm. Il nodo Nimbus copia il file del vaso sui nodi del lavoratore e lo rende disponibile per il thread dell'esecuzione. Il metodo aperto () / prepari () può leggere il file di proprietà e assegnare alla variabile di istanza.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top