Question

The scenario:I send fifty thousand messages to the queue named JUST.CN. And seting one message propertyString "myfilter='abc'" every 1000 message.Now I create consumer with the same selctor to consume messages.However the comsuming rate is very slow especialy after 30000 messages.I cant change default configuration in activeMQ. The Core Code is below:

 IDestination destination = SessionUtil.GetDestination(session, "JUST.CN");
                    IMessageProducer producer = session.CreateProducer(destination);
                    string msg = "Hello hello hello world!~~testing~Hello hello hello world!~~testing~";

                    for (int i = 0; i < 50000; i++)
                    {
                        ITextMessage message;

                        if (i % 1000 == 0)
                        {
                            message = session.CreateTextMessage(msg);
                            message.Properties.SetString("myfilter", "abc");
                        }
                        else
                        {
                            message = session.CreateTextMessage(msg);
                        }
                        producer.Send(message, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);
                    }

The Consumer's code:

 IDestination destination = SessionUtil.GetDestination(session, "JUST.CN");             
 IMessageConsumer consumer = session.CreateConsumer(destination, "myfilter='abc'", false);

                    int count = 0;
                    DateTime dtstart = DateTime.Now;
                 for (int i = 0; i < 50; i++)
                    {
                        IMessage iMsg = consumer.Receive();
                        ITextMessage msg = (ITextMessage)iMsg;
                        Console.WriteLine(msg.Text);
                        count++;

                    }
                    DateTime dtend = DateTime.Now;
                    TimeSpan time = dtend - dtstart;
                    Console.WriteLine(time);
                    Console.WriteLine(count);

Is there any special setting that I need to use for the selectors to ActiveMQ? Thank you in advance for any inputs.

Was it helpful?

Solution

In general, using message selectors with queues is an anti-pattern. There's a good article on why this is on a blog from a few years back at Ade on Middleware.

If you are looking at using message selectors on queues, the use case is generally that certain consumers are interested in only some messages. You can address this use case in a much nicer way through the use of composite destinations configured on the broker that apply a filter (via filteredDestination) that is the equivalent of the selection logic:

<broker xmlns="http://activemq.apache.org/schema/core">
  <destinationInterceptors>
    <virtualDestinationInterceptor>
      <virtualDestinations>
        <compositeQueue name="myapp.in" forwardOnly="true">
          <forwardTo>
            <filteredDestination selector="myHeader > 5" queue="myapp.out.high"/>
            <filteredDestination selector="myHeader <= 5" queue="myapp.out.low"/>
          </forwardTo>
        </compositeQueue>
      </virtualDestinations>
    </virtualDestinationInterceptor>
  </destinationInterceptors>
</broker> 

What happens here is that the SQL92 filter is run against the message when it arrives in the myapp.in queue, and the messages are sorted appropriately. A subscriber that wants to consume only the high messages subscribes to myapp.out.high.

By doing this you are effectively turning the problem upside down and removing the need for complicated processing when consuming messages.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top