Question

A project I'm working on requires the reading of messages from SQS, and I decided to use Akka to distribute the processing of these messages.

Since SQS is support by Camel, and there is functionality built in for use in Akka in the Consumer class, I imagined it would be best to implement the endpoint and read messages this way, though I had not seen many examples of people doing so.

My problem is that I cannot poll my queue quickly enough to keep my queue empty, or near empty. What I originally thought was that I could get a Consumer to receive messages over Camel from SQS at a rate of X/s. From there, I could simply create more Consumers to get up to the rate at which I needed messages processed.

My Consumer:

import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}

class MyConsumer() extends Consumer {
  def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
  var count = 0

  def receive = {
    case msg: CamelMessage => {
      count += 1
    }
    case _ => {
      println("Got something else")
    }
  }

  override def postStop(){
    println("Count for actor: " + count)
  }
}

As shown, I've set delay=1 as well as &maxMessagesPerPoll=10 to improve the rate of messages, but I'm unable to spawn multiple consumers with the same endpoint.

I read in the docs that By default endpoints are assumed not to support multiple consumers. and I believe this holds true for SQS endpoints as well, as spawning multiple consumers will give me only one consumer where after running the system for a minute, the output message is Count for actor: x instead of the others which output Count for actor: 0.

If this is at all useful; I'm able to read approximately 33 messages/second with this current implementation on the single consumer.

Is this the proper way to be reading messages from an SQS queue in Akka? If so, is there way I can get this to scale outward so that I can increase my rate of message consumption closer to that of 900 messages/second?

Was it helpful?

Solution

Sadly Camel does not currently support parallel consumption of messages on SQS.

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

To address this I've written my own Actor to poll batch messages SQS using the aws-java-sdk.

  def receive = {
    case BeginPolling => {
      // re-queue sending asynchronously
      self ! BeginPolling
      // traverse the response
      val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry]
      val messages = sqs.receiveMessage(receiveMessageRequest).getMessages
      messages.toList.foreach {
        node => {
          deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle))
          //log.info("Node body: {}", node.getBody)
          filterSupervisor ! node.getBody
        }
      }
      if(deleteEntryList.size() > 0){
        val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList)
        sqs.deleteMessageBatch(deleteMessageBatchRequest)
      }
    }

    case _ => {
      log.warning("Unknown message")
    }
  }

Though I'm not certain if this is the best implementation, and it could of course be improved upon so that requests are not constantly hitting an empty queue, it does suit my current needs of being able to poll messages from the same queue.

Getting about 133 (messages/second)/actor from SQS with this.

OTHER TIPS

Camel 2.15 supports concurrentConsumers, though not sure how useful this is in that I don't know if akka camel support 2.15 and I do not know if having one Consumer actor makes a difference even if there are multiple consumers.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top