我们有一个MSMQ队列设置,该设置接收消息并通过应用程序处理。我们想让另一个过程订阅队列,只需阅读消息并记录其内容即可。

我已经有了这个问题,问题在于它不断窥视队列。服务器上运行时的CPU约为40%。 MQSVC.EXE以30%的速度运行,此应用程序以10%的速度运行。我宁愿只等待一条消息传来,收到通知它,然后在不经常轮询服务器的情况下对其进行登录。

    Dim lastid As String
    Dim objQueue As MessageQueue
    Dim strQueueName As String

    Public Sub Main()
        objQueue = New MessageQueue(strQueueName, QueueAccessMode.SendAndReceive)
        Dim propertyFilter As New MessagePropertyFilter
        propertyFilter.ArrivedTime = True
        propertyFilter.Body = True
        propertyFilter.Id = True
        propertyFilter.LookupId = True
        objQueue.MessageReadPropertyFilter = propertyFilter
        objQueue.Formatter = New ActiveXMessageFormatter
        AddHandler objQueue.PeekCompleted, AddressOf MessageFound

        objQueue.BeginPeek()
    end main

    Public Sub MessageFound(ByVal s As Object, ByVal args As PeekCompletedEventArgs)

        Dim oQueue As MessageQueue
        Dim oMessage As Message

        ' Retrieve the queue from which the message originated
        oQueue = CType(s, MessageQueue)

            oMessage = oQueue.EndPeek(args.AsyncResult)
            If oMessage.LookupId <> lastid Then
                ' Process the message here
                lastid = oMessage.LookupId
                ' let's write it out
                log.write(oMessage)
            End If

        objQueue.BeginPeek()
    End Sub
有帮助吗?

解决方案

在窥视迭代之间进行的螺纹。Sleep(10)可以为您节省一堆周期。

我能想到的唯一其他选择是将登录构建到队列阅读应用程序中。

其他提示

您是否尝试过使用 msmqevent.arriver 跟踪消息?

当MSMQQueue.MSMQQueue.EnablEnotification方法的到达事件被触发。

没有API可以让您只能窥视每条消息一次。

问题是 BeginPeek 如果队列上已经有消息,请立即执行其回调。由于您没有删除消息(这是 窥视 毕竟,没有收到!),当您的回调再次开始窥视时,该过程开始了, MessageFound 几乎不断运行。

您的最佳选择是记录作者或读者中的消息。 日记 将在短时间内工作(如果您只关心收到的消息),但不是一个长期解决方案:

虽然从队列配置的队列中检索消息的性能开销仅比没有记录的消息检索消息高约20%,但实际成本是当不受控制的MSMQ服务从内存中运行或机器不在磁盘上时引起的意外问题空间

这对我有用。它在等待消息时阻止线程。每个循环周期都检查班级成员 _bServiceRunning 查看线程是否应该中止。

    private void ProcessMessageQueue(MessageQueue taskQueue)
    {
        // Set the formatter to indicate body contains a binary message:
        taskQueue.Formatter = new BinaryMessageFormatter();

        // Specify to retrieve selected properties.
        MessagePropertyFilter myFilter = new MessagePropertyFilter();
        myFilter.SetAll();
        taskQueue.MessageReadPropertyFilter = myFilter;

        TimeSpan tsQueueReceiveTimeout = new TimeSpan(0, 0, 10); // 10 seconds

        // Monitor the MSMQ until the service is stopped:
        while (_bServiceRunning)
        {
            rxMessage = null;

            // Listen to the queue for the configured duration:
            try
            {
                // See if a message is available, and if so remove if from the queue if any required
                // web service is available:
                taskQueue.Peek(tsQueueReceiveTimeout);

                // If an IOTimeout was not thrown, there is a message in the queue
                // Get all the messages; this does not remove any messages
                Message[] arrMessages = taskQueue.GetAllMessages();

                // TODO: process the message objects here;
                //       they are copies of the messages in the queue
                //       Note that subsequent calls will return the same messages if they are
                //       still on the queue, so use some structure defined in an outer block
                //       to identify messages already processed.

            }
            catch (MessageQueueException mqe)
            {
                if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                {
                    // The peek message time-out has expired; there are no messages waiting in the queue
                    continue; // at "while (_bServiceRunning)"
                }
                else
                {
                    ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, mqe);
                    break; // from "while (_bServiceRunning)"
                }
            }
            catch (Exception ex)
            {
                ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, ex);
                break; // from "while (_bServiceRunning)"
            }
        }

    } // ProcessMessageQueue()

恕我直言,您应该在队列上打开日记。然后,您可以保证保留所有已授予队列的消息的副本,而您的劳动尝试并不是这样,以制作自己的机制来记录所有内容。

如果您想要比队列本身更容易读取的内容,那么要在计划的基础上记录和删除日记帐消息的地方要容易得多,而且更可靠的是(我当然想要)。那么,该过程的工作速度或不重要,您只需要一次获取消息,这只是解决问题的更好方法。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top