Question

I am trying to run WordCount Example with Guaranteeing message processing.

There is one spout

  1. WSpout - emitting random sentences with msgID.

and two bolts

  1. SplitSentence - spliting sentence in words and emit with anchoring

  2. WordCount - printing words count.

What i wanted to achieve with below code is that when all words counting for a sentence would be done. Spout corresponding to that sentence must be acknowledged.

I am acknowledging with _collector.ack(tuple) at last bolt WordCount only. I see strange is that inspite of ack() is getting called at WordCount.execute() , corresponding WSpout.ack() is not getting called. it is always failed after default timeout.

I really don't understand whats wrong with code. Please help me understand the problem. Any help appreciated.

Below is complete Code.

public class TestTopology {

    public static class WSpout implements IRichSpout {
        SpoutOutputCollector _collector;
    Integer msgID = 0;
    @Override
    public void nextTuple() {
        Random _rand = new Random();
        String[] sentences = new String[] { "There two things benefit",
                " from Storms reliability capabilities",
                "Specifying a link in the",
                " tuple tree is " + "called anchoring",
                " Anchoring is done at ",
                "the same time you emit a " + "new tuple" };

        String message = sentences[_rand.nextInt(sentences.length)];
        _collector.emit(new Values(message), msgID);
        System.out.println(msgID + " " + message);

        msgID++;
    }
    @Override
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        System.out.println("open");
        _collector = collector;
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("LINE"));
    }
    @Override
    public void ack(Object msgID) {
        System.out.println("ack ------------------- " + msgID);

    }
    @Override
    public void fail(Object msgID) {
        System.out.println("fail ----------------- " + msgID);

    }
    @Override
    public void activate() {
        // TODO Auto-generated method stub
    }
    @Override
    public void close() {

    }
    @Override
    public void deactivate() {
        // TODO Auto-generated method stub
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }
}

public static class SplitSentence extends BaseRichBolt {
    OutputCollector _collector;
    public void prepare(Map conf, TopologyContext context,
            OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for (String word : sentence.split(" ")) {
            System.out.println(word);
            _collector.emit(tuple, new Values(word));
        }
        //_collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

public static class WordCount extends BaseBasicBolt {
    Map<String, Integer> counts = new HashMap<String, Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        System.out.println("WordCount MSGID : " + tuple.getMessageId());
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
            count = 0;
        count++;
        System.out.println(word + " ===> " + count);
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }

}

public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new WSpout(), 2);
    builder.setBolt("split", new SplitSentence(), 2).shuffleGrouping(
            "spout");
    builder.setBolt("count", new WordCount(), 2).fieldsGrouping("split",
            new Fields("word"));
    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
        conf.setNumWorkers(1);
        StormSubmitter.submitTopology(args[0], conf,
                builder.createTopology());
    } else {
        conf.setMaxTaskParallelism(3);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word-count", conf, builder.createTopology());
        Thread.sleep(10000);
        cluster.shutdown();
    }
}
}
Was it helpful?

Solution

WordCount extends BaseBasicBolt which ensures the tuples are acked automatically IN THAT BOLT, like you stated in your comment. However, SplitSentence extends BaseRichBolt which requires you to ack tuples manually. You're not acking, so tuples time out.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top