Как предоставить значения storm для расчета
-
21-12-2019 - |
Вопрос
Мне трудно понять, как предоставить значения storm, поскольку я новичок в storm.
Я начал со стартового набора.Я прошел через TestWordSpout
и в этом следующий код предоставляет новые значения
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
Итак, я вижу, что это происходит по одному слову за раз _collector.emit(new Values(word));
Как я могу предоставить набор слов напрямую.Возможно ли это?
Что я имею в виду, когда вызывается nextTuple, новые слова выбираются случайным образом из списка и выдаются.Случайный список может выглядеть следующим образом через определенный промежуток времени
@100ms: nathan
@200ms: golda
@300ms: golda
@400ms: jackson
@500ms: mike
@600ms: nathan
@700ms: bertels
Что, если у меня уже есть коллекция из этого списка, и я просто отправлю ее в storm.
Решение
Тип "Значения" принимает объекты любого типа и в любом количестве.
Таким образом, вы можете просто отправить список, например, из метода execute для Bolt или из метода nextTuple для Spout:
List<String> words = new ArrayList<>();
words.add("one word");
words.add("another word");
_collector.emit(new Values(words));
Вы также можете добавить новое поле, просто обязательно объявите его в методе declareOutputFields
_collector.emit(new Values(words, "a new field value!");
И в вашем методе declareOutputFields
@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("collection", "newField"));
}
Вы можете получить поля в следующем элементе топологии из объекта tuple, заданного методом execute:
List<String> collection = (List<String>) tuple.getValueByField("collection");
String newFieldValue = tuple.getStringByField("newField");
Другие советы
Storm спроектирован и создан для обработки непрерывного потока данных.Пожалуйста, посмотрите Обоснование шторма.Очень маловероятно, что входные данные будут переданы в кластер storm.Как правило, входные данные для storm поступают либо из очередей JMS, Apache Kafka, либо из каналов Twitter и т.д.Я бы подумал, вы хотели бы передать несколько конфигураций.В этом случае будет применяться следующее.
Учитывая цель разработки Storm, в Storm могут быть переданы очень ограниченные сведения о конфигурации, такие как сведения о подключении RDMBS (Oracle/DB2/MySQL и т.д.), сведения о поставщике JMS (IBM MQ/RabbitMQ и т.д.) или Apache Kafka details/Hbase и т.д.
Что касается вашего конкретного вопроса или предоставления сведений о конфигурации для вышеуказанных продуктов, есть три способа, о которых я мог бы подумать
1.Установите параметры конфигурации для экземпляра носика или болта
Например, для:Объявите переменные экземпляра и присвойте значения как часть конструктора Spout/Bolt, как показано ниже
public class TestWordSpout extends BaseRichSpout {
List<String> listOfValues;
public TestWordSpout(List<String> listOfValues) {
this.listOfValues=listOfValues;
}
}
В классе отправки топологии создайте экземпляр Spout со списком значений
List<String> listOfValues=new ArrayList<String>();
listOfValues.add("nathan");
listOfValues.add("golda");
listOfValues.add("mike");
builder.setSpout("word", new TestWordSpout(listOfValues), 3);
Эти значения доступны в качестве переменных экземпляра в nextTuple()
метод
Пожалуйста, ознакомьтесь с интеграциями Storm по адресу Штормовой вклад в конфигурациях, установленных для СУБД/Kafka и т.д., как указано выше
2.Установите конфигурации в getComponentConfiguration()
.Этот метод используется для переопределения конфигураций топологии, однако вы могли бы указать несколько деталей, как показано ниже
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> ret = new HashMap<String, Object>();
if(!_isDistributed) {
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
List<String> listOfValues=new ArrayList<String>();
listOfValues.add("nathan");
listOfValues.add("golda");
listOfValues.add("mike");
ret.put("listOfValues", listOfValues);
}
return ret;
}
а подробные сведения о конфигурации доступны в разделе open() or prepare()
способ крепления носика/болта соответственно.
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.listOfValues=(List<String>)conf.get("listOfValues");
}
3.Объявите конфигурации в файле свойств и сохраните их как часть файла jar, который будет отправлен в кластер Storm.Узел Nimbus копирует jar-файл на рабочие узлы и делает его доступным для потока-исполнителя.Метод open()/prepare() может считывать файл свойств и присваивать переменной экземпляра.