Question

I am working on real-time event-based application using Spring WebSockets, Messaging and RabbitMQ. In this application, messages need to be delivered to clients in the exact order they were inserted into RabbitMQ.

"EDITED"

Our goal is to recieve a message from a browser, process it on the server in order (against a unique object determined by route paramteter), enrich the message and broadcast it to all subscribing browsers via external STOMP MQ (RabbitMQ).

Our MessageMapping method is as follows:

@MessageMapping(/commands.{route}.{data}) public CommandMessage receiveCommand(CommandMessage message, Principal principal) {

try {

  // Get object to synch on using route
  Object o = ...

  syncrhonized(o) {

    // Perform command on object

    // Set message server sequence
    message.setServerSequence(o.getAutoIncrementSequence());

    // Log server sequence
    log.debug("Message server sequence:" + message.getServerSequence());

    // Send to external MQ for broadcasting to all subscribers
    return message;
  }

} catch (Exception e) {
  ...
}

return null;

}

If we configure the ClientInboundChannel and ClientOutboundChannels with a corePoolSize and maxPoolSize of 1 each, all messages are in order.

If we increase the corePoolSize and maxPoolSize on the ClientInboundChannel, messages get to MQ in the incorrect order; if we do the same increase for ClientOutboundChannel, messages get to browser in incorrect order.

]

Tests were done using a single browser client.

We turned on tracing for StompBrokerRelayMessageHandler and received logs entries like:

2014-05-06 14:26:39 TRACE [clientInboundChannel-6] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] Processing message=[Payload byte[303]][Headers={stompCommand=SEND, nativeHeaders={content-type=[application/json;charset=UTF-8], destination=[/topic/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=application/json;charset=UTF-8, simpSessionId=ehjcoxb3, id=a31d0e3d-12cc-f562-1ec2-e2d7ba0899eb, timestamp=1399400799940}] 2014-05-06 14:26:39 DEBUG [clientInboundChannel-6] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] Forwarding message to broker 2014-05-06 14:26:39 TRACE [clientInboundChannel-3] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] Ignoring message to destination=/app/commands.BKN01.20140318 2014-05-06 14:26:39 TRACE [clientInboundChannel-7] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] Ignoring message to destination=/app/commands.BKN01.20140318 2014-05-06 14:26:39 TRACE [clientInboundChannel-1] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] Processing message=[Payload byte[314]][Headers={stompCommand=SEND, nativeHeaders={content-type=[application/json;charset=UTF-8], destination=[/topic/commands.BKN01.20140318]}, simpMessageType=MESSAGE, simpDestination=/topic/commands.BKN01.20140318, contentType=application/json;charset=UTF-8, simpSessionId=ehjcoxb3, id=3cc7b4ae-8ea4-ef8a-6c4d-c3bc1ed23bcd, timestamp=1399400799947}] 2014-05-06 14:26:39 DEBUG [clientInboundChannel-1] o.s.m.s.s.StompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] Forwarding message to broker

We also turned tracing on for StompSubProtocolHandler in the org.springframework.web.socket.messaging package and received messages like:

2014-05-07 10:58:58 TRACE [http-nio-8080-exec-5] o.s.w.s.m.StompSubProtocolHandler [StompSubProtocolHandler.java:180] Received message from client session=u8wrnsr6

None of the information provides an easy way to map our message.serverSequence property (which is set prior to sending to MQ) with the various ids the log details.

Is there any way to increase inbound/outbound channel threads so ordering is intact? For example, could the channels be tied to a "route" or could a thread be pegged to a "route"?

Please help.

Thank you,

Dan

Était-ce utile?

La solution

[EDITED]

Thanks, I see now. Indeed at present there is no way to ensure messages from the external broker are delivered to a client in the exact same order and that messages from a client will be sent to the external broker in the exact same order.

We could assign an index to every Message within a session and then check and buffer if necessary to enforce the order but that would be a new feature. Feel free to create a request in JIRA.

As for now you could look into creating a ChannelInterceptor that checks every message to see what session it belongs to and then set some incremental index header on it before having it sent (on the clientInboundChannel or clientOutboundChannel). Then extend StompSubProtocolHandler and StompBrokerRelayMessageHandler to check the index header and try to enforce the order of messages.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top