Question

I have the following code retrieving messages from a SQS queue. I am using a AmazonSQSBufferedAsyncClient to retrieve the message from Queue. A fixed delay SingleThreadedExecutor wakes up every 5 mins calling receiveMessage. Long polling is enabled in the Queue

@Service
public class AmazonQueueService
    implements QueueService<String> {

    @Autowired
    private AmazonSQSBufferedAsyncClient sqsAsyncClient;

    @Value("${aws.sqs.queueUrl}")
    private String queueUrl;

    @Override
    public List<Message<String>> receiveMessage() {
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
        ReceiveMessageResult result = sqsAsyncClient.receiveMessage(receiveMessageRequest);

        LOG.debug("Size=" + result.getMessages().size());

        return Lists.transform(result.getMessages(), ......);
    }

    .....
}

The problem is when I check AWS console, the message is always in-flight, but it is never received in the application (Size is always printed as 0) . Looks like the AmazonSQSBufferedAsyncClient is reading the message from queue but not returning in the receiveMessage call.

Any ideas?

Was it helpful?

Solution

Finally figured this out. The problem manifests with the combination of queue visibility timeout (2 mins) and the scheduledExecutor delay (5 mins).

Increasing the visibility timeout to 15 mins solved the problem.

My theory is -> The AmazonSQSBufferedAsyncClient retrieves message and keeps it in the buffer waiting for receiveMessage call. As the executor delay is 5 mins, the visibility of the message times out befor receiveMessage is called and the message is returned to the queue. It also looks like the message is picked from the queue almost immediately. And now for whatever reason a call to receiveMessage does not receive the message. Increasing the timeout which gave a chance for the receiveMessage call to happen before a timeout event, solved the problem, I guess.

Any other possible explanation?

OTHER TIPS

For me problem was my messages were being read by lambda even before I see them on console. So the result was I was not able to poll for the messages from sqs console.

You have to delete the message from the queue when you're done with it. If you don't, it's going to stay in-flight until it times out and then go right back to the queue. It is designed this way so you will never lose messages. If your program crashes before it finishes handling the message and deleting it, the message will go right back to the queue.

From the basic Java example (SampleDriver.java):

QMessage message = messages.get(0);

System.out.println("\nMessage received");
System.out.println("  message id:     " + message.getId());
System.out.println("  receipt handle: " + message.getReceiptHandle());
System.out.println(" message content: " + message.getContent());

testQueue.deleteMessage(message.getReceiptHandle()); // <===== here

In my case, I was using short polling, and sometimes it was receiving more than 1 message but I was processing only one. Thus was losing track of messages. Since the message was never processed its visibility timeout was never reached and thus message appeared in inflight messages.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top