Как предоставить значения storm для расчета

StackOverflow https://stackoverflow.com//questions/22012008

  •  21-12-2019
  •  | 
  •  

Вопрос

Мне трудно понять, как предоставить значения storm, поскольку я новичок в storm.

Я начал со стартового набора.Я прошел через TestWordSpout и в этом следующий код предоставляет новые значения

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

Итак, я вижу, что это происходит по одному слову за раз _collector.emit(new Values(word));

Как я могу предоставить набор слов напрямую.Возможно ли это?

TestWordSpout.java

Что я имею в виду, когда вызывается nextTuple, новые слова выбираются случайным образом из списка и выдаются.Случайный список может выглядеть следующим образом через определенный промежуток времени

@100ms: nathan
@200ms: golda
@300ms: golda
@400ms: jackson
@500ms: mike
@600ms: nathan
@700ms: bertels

Что, если у меня уже есть коллекция из этого списка, и я просто отправлю ее в storm.

Это было полезно?

Решение

Тип "Значения" принимает объекты любого типа и в любом количестве.

Таким образом, вы можете просто отправить список, например, из метода execute для Bolt или из метода nextTuple для Spout:

List<String> words = new ArrayList<>();
words.add("one word");
words.add("another word");
_collector.emit(new Values(words));

Вы также можете добавить новое поле, просто обязательно объявите его в методе declareOutputFields

_collector.emit(new Values(words, "a new field value!");

И в вашем методе declareOutputFields

@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("collection", "newField"));
}

Вы можете получить поля в следующем элементе топологии из объекта tuple, заданного методом execute:

List<String> collection = (List<String>) tuple.getValueByField("collection");
String newFieldValue = tuple.getStringByField("newField");

Другие советы

Storm спроектирован и создан для обработки непрерывного потока данных.Пожалуйста, посмотрите Обоснование шторма.Очень маловероятно, что входные данные будут переданы в кластер storm.Как правило, входные данные для storm поступают либо из очередей JMS, Apache Kafka, либо из каналов Twitter и т.д.Я бы подумал, вы хотели бы передать несколько конфигураций.В этом случае будет применяться следующее.

Учитывая цель разработки Storm, в Storm могут быть переданы очень ограниченные сведения о конфигурации, такие как сведения о подключении RDMBS (Oracle/DB2/MySQL и т.д.), сведения о поставщике JMS (IBM MQ/RabbitMQ и т.д.) или Apache Kafka details/Hbase и т.д.

Что касается вашего конкретного вопроса или предоставления сведений о конфигурации для вышеуказанных продуктов, есть три способа, о которых я мог бы подумать

1.Установите параметры конфигурации для экземпляра носика или болта

Например, для:Объявите переменные экземпляра и присвойте значения как часть конструктора Spout/Bolt, как показано ниже

    public class TestWordSpout extends BaseRichSpout {
         List<String> listOfValues;

   public TestWordSpout(List<String> listOfValues) {
       this.listOfValues=listOfValues;
   }

}

В классе отправки топологии создайте экземпляр Spout со списком значений

        List<String> listOfValues=new ArrayList<String>();
        listOfValues.add("nathan");
        listOfValues.add("golda");
        listOfValues.add("mike");

        builder.setSpout("word", new TestWordSpout(listOfValues), 3);

Эти значения доступны в качестве переменных экземпляра в nextTuple() метод

Пожалуйста, ознакомьтесь с интеграциями Storm по адресу Штормовой вклад в конфигурациях, установленных для СУБД/Kafka и т.д., как указано выше

2.Установите конфигурации в getComponentConfiguration().Этот метод используется для переопределения конфигураций топологии, однако вы могли бы указать несколько деталей, как показано ниже

 @Override
    public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> ret = new HashMap<String, Object>();
        if(!_isDistributed) {
            ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
            return ret;
        } else {
             List<String> listOfValues=new ArrayList<String>();
             listOfValues.add("nathan");
             listOfValues.add("golda");
             listOfValues.add("mike");
             ret.put("listOfValues", listOfValues);
        }
        return ret;
    }    

а подробные сведения о конфигурации доступны в разделе open() or prepare() способ крепления носика/болта соответственно.

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        this.listOfValues=(List<String>)conf.get("listOfValues");        
       }

3.Объявите конфигурации в файле свойств и сохраните их как часть файла jar, который будет отправлен в кластер Storm.Узел Nimbus копирует jar-файл на рабочие узлы и делает его доступным для потока-исполнителя.Метод open()/prepare() может считывать файл свойств и присваивать переменной экземпляра.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top