Question

I have started using storm, so I create simple topology using this tutorial

When I run my topology with LocalCluster and all seem fine, My Problem is that I'm not getting ACK on the tuple, meaning my spout ack is never called.

my code is below - do you know why ack is not called ?

so my topology look like this

public StormTopology build() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(HelloWorldSpout.class.getSimpleName(), 
             helloWorldSpout, spoutParallelism);

        HelloWorldBolt bolt = new  HelloWorldBolt();

        builder.setBolt(HelloWorldBolt.class.getSimpleName(), 
                   bolt, boltParallelism)
              .shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}

My Spout look like this

public class HelloWorldSpout  extends BaseRichSpout implements ISpout {
    private SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("int"));
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    private static Boolean flag = false;
    public void nextTuple() {
        Utils.sleep(5000);

            //emit only 1 tuple - for testing
        if (!flag){
            this.collector.emit(new Values(6));
            flag = true;
        }
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
    }

    public void fail(Object msgId){
        System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
    }
}

and my bolt look like this

@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, 
                    OutputCollector collector) {
        this.collector = collector;
        logger.info("preparing HelloWorldBolt");
    }

    public void execute(Tuple tuple) {
        System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
        this.collector.ack(tuple);
    }

    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }
}
Was it helpful?

Solution

Your emit() method in the spout has only one argument, so that tuple isn't anchored. That's why you're not getting a call back to the ack() method in the spout even though you're ack'ing the tuple in the bolt.

To get this to work, you need to modify your spout to emit a second argument which is the message id. It is this id that's passed back to the ack() method in the spout:

public void nextTuple() {
    Utils.sleep(5000);

        //emit only 1 tuple - for testing
    if (!flag){
        Object msgId = "ID 6";  // this can be any object
        this.collector.emit(new Values(6), msgId);
        flag = true;
    }
}


@Override
public void ack(Object msgId) {
    //  msgId should be "ID 6"
    System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top