MSMQキューを購読するにはどうすればよいですか?
-
02-10-2019 - |
質問
メッセージを受信し、アプリケーションによって処理されるMSMQキューセットアップがあります。別のプロセスをキューに登録して、メッセージを読んで内容を記録したいと思います。
私はすでにこれを導入しています、問題はそれが絶えずキューを覗いていることです。これが実行中のサーバー上のCPUは約40%です。 MQSVC.EXEは30%で実行され、このアプリは10%で実行されます。メッセージが入ってくるのを待って、それを取得し、サーバーに絶えずポーリングせずにログを記録するものがあります。
Dim lastid As String
Dim objQueue As MessageQueue
Dim strQueueName As String
Public Sub Main()
objQueue = New MessageQueue(strQueueName, QueueAccessMode.SendAndReceive)
Dim propertyFilter As New MessagePropertyFilter
propertyFilter.ArrivedTime = True
propertyFilter.Body = True
propertyFilter.Id = True
propertyFilter.LookupId = True
objQueue.MessageReadPropertyFilter = propertyFilter
objQueue.Formatter = New ActiveXMessageFormatter
AddHandler objQueue.PeekCompleted, AddressOf MessageFound
objQueue.BeginPeek()
end main
Public Sub MessageFound(ByVal s As Object, ByVal args As PeekCompletedEventArgs)
Dim oQueue As MessageQueue
Dim oMessage As Message
' Retrieve the queue from which the message originated
oQueue = CType(s, MessageQueue)
oMessage = oQueue.EndPeek(args.AsyncResult)
If oMessage.LookupId <> lastid Then
' Process the message here
lastid = oMessage.LookupId
' let's write it out
log.write(oMessage)
End If
objQueue.BeginPeek()
End Sub
解決
覗き見の間にスレッド(10)は、繰り返しの間に多くのサイクルを節約する場合があります。
私が考えることができる他の唯一のオプションは、キュー読み取りアプリケーションにログインすることを構築することです。
他のヒント
使用してみましたか msmqevent.arrived メッセージを追跡するには?
msmqqueue.msmqqueue.Enablenotificationメソッドの到着イベントは、開いたキューを表すMSMQqueueオブジェクトのインスタンスのエンパレノライテンス方法が呼び出され、メッセージが見つかったか、キュー内の適用される位置に到着します。
各メッセージを一度だけ覗くことができるAPIはありません。
問題はそれです BeginPeek
キューにメッセージが既にある場合は、すぐにコールバックを実行します。あなたはメッセージを削除していないので(これはそうです ピーク 結局のところ、受信しないでください!)、あなたのコールバックが再び覗き始めると、プロセスが始まりますので、 MessageFound
ほぼ絶えず動作します。
あなたの最良のオプションは、ライターまたは読者のメッセージをログに記録することです。 ジャーナリング 短期間は機能します(受信されたメッセージのみを気にかけている場合)が、長期的な解決策ではありません。
ジャーナリング用に構成されているキューからメッセージを取得するパフォーマンスオーバーヘッドは、ジャーナリングなしでメッセージを取得するよりも約20%しか大きいですが、実際のコストは、未チェックのMSMQサービスがメモリから実行されているか、マシンがディスクから外れている場合に発生する予期しない問題です。スペース
これは私のために働きます。メッセージを待っている間にスレッドをブロックします。各ループサイクルはクラスメンバーをチェックします _bServiceRunning
スレッドが中止されるべきかどうかを確認します。
private void ProcessMessageQueue(MessageQueue taskQueue)
{
// Set the formatter to indicate body contains a binary message:
taskQueue.Formatter = new BinaryMessageFormatter();
// Specify to retrieve selected properties.
MessagePropertyFilter myFilter = new MessagePropertyFilter();
myFilter.SetAll();
taskQueue.MessageReadPropertyFilter = myFilter;
TimeSpan tsQueueReceiveTimeout = new TimeSpan(0, 0, 10); // 10 seconds
// Monitor the MSMQ until the service is stopped:
while (_bServiceRunning)
{
rxMessage = null;
// Listen to the queue for the configured duration:
try
{
// See if a message is available, and if so remove if from the queue if any required
// web service is available:
taskQueue.Peek(tsQueueReceiveTimeout);
// If an IOTimeout was not thrown, there is a message in the queue
// Get all the messages; this does not remove any messages
Message[] arrMessages = taskQueue.GetAllMessages();
// TODO: process the message objects here;
// they are copies of the messages in the queue
// Note that subsequent calls will return the same messages if they are
// still on the queue, so use some structure defined in an outer block
// to identify messages already processed.
}
catch (MessageQueueException mqe)
{
if (mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
{
// The peek message time-out has expired; there are no messages waiting in the queue
continue; // at "while (_bServiceRunning)"
}
else
{
ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, mqe);
break; // from "while (_bServiceRunning)"
}
}
catch (Exception ex)
{
ErrorNotification.AppLogError("MSMQ Receive Failed for queue: " + mqs.Name, ex);
break; // from "while (_bServiceRunning)"
}
}
} // ProcessMessageQueue()
iMhoキューでジャーナリングをオンにする必要があります。次に、キューにコミットされたすべてのメッセージのコピーが保持されていることが保証されています。それは、すべてを記録するための独自のメカニズムを作成しようとする労力をかけることには当てはまりません。
はるかに簡単で信頼性が高く、キュー自体よりも簡単に読みやすいものが必要な場合は、スケジュールされたメッセージをスケジュールされたベースでログと削除することです(そして、私は確かにそれを望んでいます)。その場合、プロセスがどれほど速く動作しているかどうかは本当に問題ではありません。メッセージを一度に取得する必要があり、全体的に問題を解決するためのはるかに優れた方法です。