문제
나는 폭풍에 대한 초보자이기 때문에 폭풍에 가치를 제공하는 방법을 이해하는 데 어려움이 있습니다.
나는 스타터 키트로 시작했습니다.나는 TestWordSpout
를 통과했고, 다음 코드는 새로운 값을 제공한다
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
.
한 번에 한 단어를 찍는 것이 뭔가를 봅니다.
어떻게 단어의 컬렉션을 제공 할 수 있습니다.이를 가능하게합니다.
Nextletle이라고하는 것은 목록에서 임의로 임의로 선택되고 방출됩니다.임의의 목록은 특정 시간 간격 후에이 것처럼 보일 수 있습니다
@100ms: nathan
@200ms: golda
@300ms: golda
@400ms: jackson
@500ms: mike
@600ms: nathan
@700ms: bertels
.
이미이 목록의 컬렉션을 가지고 있고 폭풍에 빠지면 어떨까요?
해결책
"값"유형은 모든 종류의 객체와 모든 숫자를 허용합니다.
이므로 볼트의 실행 방법 또는 스파우트의 다음 단계에서 예를 들어 목록을 보낼 수 있습니다.
List<String> words = new ArrayList<>();
words.add("one word");
words.add("another word");
_collector.emit(new Values(words));
.
새 필드를 추가 할 수 있습니다.
_collector.emit(new Values(words, "a new field value!");
.
및 DeclareOutputFields 방법
@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("collection", "newField"));
}
.
execute 메소드가 주어진 튜플 객체의 토폴로지의 다음 볼트에서 필드를 가져올 수 있습니다.
List<String> collection = (List<String>) tuple.getValueByField("collection");
String newFieldValue = tuple.getStringByField("newField");
. 다른 팁
Storm은 지속적인 데이터 스트림을 처리하기 위해 설계되고 구축되었습니다. 폭풍 에 대한 이론적 근거를 참조하십시오. 입력 데이터가 폭풍우 클러스터에 공급되는 것은 당연하지는 않습니다. 일반적으로 입력 데이터는 JMS 대기열, Apache Kafka 또는 Twitter Feed 등의 JMS 대기열에서 몇 가지 구성을 전달하고자합니다. 이 경우 다음 사항이 적용됩니다.
폭풍 설계 목적을 고려할 때, 매우 제한된 구성 세부 사항은 RDMBS 연결 세부 사항 (Oracle / DB2 / MySQL 등), JMS 제공자 세부 정보 (IBM MQ / RABBITMQ 등) 또는 Apache Kafka 세부 정보 / HBase 등과 같은 폭풍에 전달 될 수 있습니다. .
특정 질문을하거나 위의 제품에 대한 구성 세부 사항을 제공하는 경우, 내가 생각할 수있는 세 가지 방법이 있습니다
1. 스파우트 또는 볼트의 인스턴스에서 구성 세부 정보를 설정합니다
예를 들어 : 인스턴스 변수를 선언하고 아래로 값을 Spout / Bolt 생성자의 일부로 할당
public class TestWordSpout extends BaseRichSpout {
List<String> listOfValues;
public TestWordSpout(List<String> listOfValues) {
this.listOfValues=listOfValues;
}
}
.
토폴로지 제출 클래스에서 값 목록이있는 스파우트의 인스턴스를 만듭니다.
List<String> listOfValues=new ArrayList<String>();
listOfValues.add("nathan");
listOfValues.add("golda");
listOfValues.add("mike");
builder.setSpout("word", new TestWordSpout(listOfValues), 3);
.
이 값은 nextTuple()
메소드에서 인스턴스 변수로 사용할 수 있습니다
Storm Contrib "rdbms / kafka 등의 구성에 대한 폭풍우 에서 폭풍 통합을보십시오. 위의
와 같이 2. getComponentConfiguration()
에서 구성을 설정합니다. 이 메소드는 토폴로지 구성을 무시하는 데 사용되지만 아래에서 몇 가지 세부 정보를 전달할 수 있습니다
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> ret = new HashMap<String, Object>();
if(!_isDistributed) {
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
List<String> listOfValues=new ArrayList<String>();
listOfValues.add("nathan");
listOfValues.add("golda");
listOfValues.add("mike");
ret.put("listOfValues", listOfValues);
}
return ret;
}
.
및 구성 세부 사항은 각각 Spout / Bolt의 open() or prepare()
방법에서 사용할 수 있습니다.
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.listOfValues=(List<String>)conf.get("listOfValues");
}
.
3.Declare 속성 파일의 구성 및 Jar 파일의 일부로서 스톰 클러스터에 제출되는 JAR 파일의 일부로 jar. NIMBUS 노드는 JAR 파일을 작업자 노드에 복사하고 실행 프로그램에 사용할 수있게합니다. open () / prepare () 메서드는 속성 파일을 읽고 인스턴스 변수에 할당 할 수 있습니다.