Question

I'm designing a quite complicated system and was wondering what the best way is to put a jms consumer (activemq, vm protocol, non persitent) inside a netty handler.

Let me explain, i have several clients connecting to my netty server using websockets. For every client connection i create a jms consumer that listens for interesting messages on one or more topics. If a interesting message arrives i need to do a extra step (additional filtering) before sending the message to the client using the websocket.

Is the following a good way to do this:

  • inside a SimpleChannelInboundHandler i declare a private non static consumer
  • the consumer is initialized in channelActive
  • the consumer is destroyed in channelInactive
  • when a message is received by consumer i do the extra filter a send it using ctx.channel().write()

In this setup i'm a bit worried that the consumer might turn into slow consumer and slow everything down, cause the websocket goes over the internet.

I came up with a more complex one to decouple the "receiving of message by consumer" and "sending of message through a websocket".

  • inside a SimpleChannelInboundHandler i declare a private non static consumer
  • the consumer is initialized in channelActive
  • the consumer is destroyed in channelInactive
  • when a message is received by consumer i put it in a blockedqueue
  • every minute i let a thread (created for every client) look in the queue and send the found messages to the client using ctx.channel().write().

At this point i'm a bit worried about the extra thread per client.

Or is there maybe a better way to accomplish this task?

Was it helpful?

Solution

This is a classic slow consumer problem and the first step to resolving it is to determine what the appropriate action is when a slow consumer is detected. If it is acceptable that the slow consumer misses messages then the solution is some variation on dropping messages or unsubscribing them from the feed. For example, if it's acceptable that the client misses messages then, when one is received from JMS, check if the channel is writable. If it isn't, drop the message. If you want to give yourself a bit more of a buffer (although OS buffers are quite large) you can track the number of write completion future's that haven't completed (ie the messages haven't been written to the OS send buffer) and drop messages if there are too many outstanding write requests.

If the client may not miss messages, and is consistently slow, then the problem is more difficult. One option might be to divert messages to a JMS queue with a specific header value, then open a new consumer that reads messages from that queue using a JMS selector. This will put more load on the JMS server but might be appropriate for temporary slowness and hopefully it won't interfere with you main topic feeds. Alternatively you might want to stash the messages in a different store, such as a database, so you can poll for messages when they can be sent. If you do this right a single polling thread can cope with many clients (query for clients which have outstanding messages, then for each client, load a bunch of messages). However this isn't as convenient as using JMS.

I wouldn't go with option 2 because the blocking queue is only going to solve the problem temporarily, and you can achieve the same thing by tracking how many write operations are waiting to complete.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top