Come fornire valori a tempestare il calcolo
-
21-12-2019 - |
Domanda
Ho difficoltà a capire come fornire valori da temperare da quando sono un principiante di tempesta.
Ho iniziato con il kit di avviamento.Ho attraversato il TestWordSpout
e in quanto il seguente codice fornisce nuovi valori
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
.
Così vedo che sta prendendo una parola alla volta _collector.emit(new Values(word));
Come posso fornire una raccolta di parole direttamente. Questo è possibile?
Cosa intendo quando NextTuple è chiamato nuove parole è selezionato a caso dall'elenco ed emesso.L'elenco casuale potrebbe sembrare così dopo un intervallo di tempo
@100ms: nathan
@200ms: golda
@300ms: golda
@400ms: jackson
@500ms: mike
@600ms: nathan
@700ms: bertels
.
Cosa succede se ho già una raccolta di questo elenco e basta darlate a tempesta.
Soluzione
"Valori" Tipo Accetta qualsiasi tipo di oggetti e qualsiasi numero.
In modo da poter semplicemente inviare un elenco ad esempio dal metodo di esecuzione di un bullone o dal metodo successivo di un beccuccio:
List<String> words = new ArrayList<>();
words.add("one word");
words.add("another word");
_collector.emit(new Values(words));
.
Puoi aggiungere anche un nuovo campo, assicurarsi solo di dichiararlo nel metodo DichiaroOutputfields
_collector.emit(new Values(words, "a new field value!");
.
e nel tuo metodo DichiaratoreOutputfields
@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("collection", "newField"));
}
.
È possibile ottenere i campi nel bullone successivo nella topologia dall'oggetto della tupla fornito dal metodo EXECUTE:
List<String> collection = (List<String>) tuple.getValueByField("collection");
String newFieldValue = tuple.getStringByField("newField");
. Altri suggerimenti
Storm è progettato e costruito per elaborare il flusso continuo dei dati. Si prega di vedere Razionale per la tempesta . È molto improbabile che i dati di input siano nutriti nel cluster di tempesta. Generalmente, i dati di input da tempestare sono sia dalle code JMS, Apache Kafka o feed Twitter, ecc. Penserei, vorresti passare poche configurazioni. In tal caso, si applicherebbe quanto segue.
Considerando lo scopo della tempesta di progettazione, i dettagli di configurazione molto limitati possono essere passati a Storm come i dettagli della connessione RDmbs (Oracle / DB2 / MySQL, ecc.), Dettagli del provider JMS (IBM MQ / RabbitMQ ecc.) o Apache Kafka Dettagli / HBASE ecc. .
Per la tua particolare domanda o fornendo i dettagli di configurazione per i prodotti di cui sopra, ci sono tre modi in cui potrei pensare
1.Set I dettagli di configurazione sull'istanza del beccuccio o del bullone
Per ad esempio: dichiarare le variabili di istanza e assegnare i valori come parte del costruttore del beccuccio / bullone come sotto
public class TestWordSpout extends BaseRichSpout {
List<String> listOfValues;
public TestWordSpout(List<String> listOfValues) {
this.listOfValues=listOfValues;
}
}
.
Nella classe di invio della topologia, creare un'istanza di beccuccio con l'elenco dei valori
List<String> listOfValues=new ArrayList<String>();
listOfValues.add("nathan");
listOfValues.add("golda");
listOfValues.add("mike");
builder.setSpout("word", new TestWordSpout(listOfValues), 3);
.
Questi valori sono disponibili come variabili di istanza nel metodo nextTuple()
Si prega di guardare le integrazioni della tempesta a Storm contrib sulle configurazioni impostati per RDBMS / KAFKA ecc Come sopra
2.Set Le configurazioni nel getComponentConfiguration()
. Questo metodo è utilizzato per sovrascrivere le configurazioni Topology, tuttavia, è possibile passare in pochi dettagli come sotto
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> ret = new HashMap<String, Object>();
if(!_isDistributed) {
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
List<String> listOfValues=new ArrayList<String>();
listOfValues.add("nathan");
listOfValues.add("golda");
listOfValues.add("mike");
ret.put("listOfValues", listOfValues);
}
return ret;
}
.
E i dettagli di configurazione sono disponibili nel metodo open() or prepare()
rispettivamente di Becco / Bolt.
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.listOfValues=(List<String>)conf.get("listOfValues");
}
.
3.Declare le configurazioni nel file di proprietà e nel barattolo come parte del file JAR che verrà inviato al cluster Storm. Il nodo Nimbus copia il file del vaso sui nodi del lavoratore e lo rende disponibile per il thread dell'esecuzione. Il metodo aperto () / prepari () può leggere il file di proprietà e assegnare alla variabile di istanza.