Question

I am trying to use hornetq core-api scheduled messages api in order to deliver a message after 30 seconds queue not durable and defined at hornetq configuration file

val message:String ={...} //some string
val clientMessage:ClientMessage =session.createMessage(false)
clientMessage.getBodyBuffer.writeString(message)
clientMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + 30000) 
//expecting to deliver the message after 30 seconds
    producer.send(MyQueue,clientMessage)

however when I am looking at the logs it seems that the message is sent and arrives at the same second . should I define anything else ? am I missing something ?

Adding code :

class HornetQMessageTest extends FunSuite with ShouldMatchers {
test("scheduleMessage") {
    def createServerLocator: ServerLocator = {
      var map = new java.util.HashMap[String, Object]
      map.put("host",  "127.0.0.1")
      map.put("port", "5445")
      val transConf = new TransportConfiguration(classOf[NettyConnectorFactory].getName,map)
      val locator = HornetQClient.createServerLocatorWithoutHA(transConf)
      locator.setConfirmationWindowSize(1024^2)//confirmationWindowSize
      locator.setBlockOnDurableSend(false)
      locator.setBlockOnNonDurableSend(false)
      locator.setClientFailureCheckPeriod(5000) //keepAlivePing
      locator.setConnectionTTL(10000) //connection TTL
      locator
    }
    val serverLocator: ServerLocator = createServerLocator
    val sessionFactory: ClientSessionFactory = serverLocator.createSessionFactory()
    val receiverSession = sessionFactory.createSession(true, true, 0)
    val senderSession =sessionFactory.createSession(true, true, 0)
    val queue = "atestq"
    def closeHorentQClient() {
      receiverSession.close()
      senderSession.close()
      sessionFactory.close()
      serverLocator.close()
    }
    senderSession should not be null
    receiverSession should not be null
    val query: QueueQuery = senderSession.queueQuery(new SimpleString(queue))
    if (query == null || !query.isExists) senderSession.createQueue(queue,queue,false)
    val producer: ClientProducer = senderSession.createProducer(queue)
    val message = senderSession.createMessage(false)
      message.getBodyBuffer().writeString("This is my string........")

    val deliverytime = System.currentTimeMillis() + 30000
    message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, deliverytime)
    println("Message Sent "+deliverytime)
    senderSession.start()
    producer.send(message)
    receiverSession.start()
    val consumer = receiverSession.createConsumer(queue)
    val message2 = consumer.receive(50000)
    val messageBody = message2.getBodyBuffer().readString()
    println("received message: "+messageBody +" after" + ((System.currentTimeMillis()-deliverytime)/1000)+" seconds ")
    message2 should not be  null
    System.currentTimeMillis() should be >= deliverytime
    assert(messageBody == "This is my string........")

    message2.acknowledge()

    // Make sure no more messages
    closeHorentQClient()
}  }

test results:

Message Sent 1392198959686
received message: This is my string........ after-29 seconds 

1392198929758 was not greater than or equal to 1392198959686
Was it helpful?

Solution

We are using the Scheduled Executor, and with some operation systems... a scheduler.schedule(in 30 seconds) is calling our Runnable in less time than you are supposed to. I have seen this issue happening on Windows quite a lot actually.

Make sure you understand the schedule time is based on the server's time (not on the client's time). Sync your client time against your servers.

We have recently done a lot of improvements on 2.4.0, and I don't think you would hit this issue again as we now validate the time of the time and don't trust the ScheduledExecutor any longer.

if you update your OS you won't see this issue.. it's usually a kernel issue around real time and not having the waits being respected.

Or if you could use 2.4.0 which is not trusting this behaviour any longer.

I've tried this code using 2.2.eap5 on MAC without the patch on 2.4.0 and it worked. which is pretty much your test converted to java.

   public void testSomething() throws Exception
   {
      // then we create a client as normal
      ClientSessionFactory sessionFactory = createSessionFactory(locator);
      ClientSession receiverSession = sessionFactory.createSession(true, true, 0);
      ClientSession senderSession = sessionFactory.createSession(true, true, 0);
      String queue = "atestq";

      ClientSession.QueueQuery query = senderSession.queueQuery(new SimpleString(queue));
      if (query == null || !query.isExists()) senderSession.createQueue(queue, queue, false);

      ClientProducer producer = senderSession.createProducer(queue);
      ClientMessage message = senderSession.createMessage(false);
      message.getBodyBuffer().writeString("This is my string........");

      long original = System.currentTimeMillis();
      long deliverytime = System.currentTimeMillis() + 30000;
      message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, deliverytime);
      System.out.println("Message Sent " + deliverytime);
      senderSession.start();
      producer.send(message);
      receiverSession.start();
      ClientConsumer consumer = receiverSession.createConsumer(queue);
      ClientMessage message2 = consumer.receive(50000);
      String messageBody = message2.getBodyBuffer().readString();
      System.out.println("received message: " + messageBody + " after" + ((System.currentTimeMillis() - original) / 1000) + " seconds ");
      assert (messageBody.equals("This is my string........"));

      message2.acknowledge();
   }

I have just raised a feature request within HornetQ for this.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top