AMQP (RabbitMQ) キューからメッセージを選択的に削除するにはどうすればよいですか?

StackOverflow https://stackoverflow.com/questions/3434763

質問

AMQP キューからメッセージを読み取らずに選択的に削除したいと考えています。

シナリオは次のとおりです。

送信側は、タイプ X の新しい情報が到着したという事実に基づいて、タイプ X のメッセージを期限切れにしたいと考えています。サブスクライバがタイプ X の最新のメッセージをまだ消費していない可能性が非常に高いため、パブリッシャーは以前の X タイプのメッセージを削除し、最新のメッセージをキューに入れる必要があります。操作全体は加入者にとって透過的である必要があります。実際、加入者はメッセージを取得するために STOMP のような単純なものを使用する必要があります。

AMQP を使用してそれを行うにはどうすればよいですか?それとも、別のメッセージング プロトコルの方が便利なのでしょうか?

複雑なインフラストラクチャは避けたいと考えています。必要なメッセージング全体は上記のように単純です。キューが 1 つ、サブスクライバーが 1 つ、パブリッシャーが 1 つですが、パブリッシャーは特定の基準に従ってメッセージをアドホックに削除できる必要があります。

パブリッシャークライアントは Ruby を使用しますが、実際には、プロトコル内でそれを行う方法が判明次第、どの言語でも扱うつもりです。

役に立ちましたか?

解決

現在、RabbitMQ (より一般的には AMQP) でこれを自動的に行うことはできません。ただし、簡単な回避策があります。

3 種類のメッセージを送信するとします。X、Y、Z。私の質問の理解が正しければ、X メッセージが到着したときに、ブローカーが配信されていない他のすべての X メッセージを忘れるようにする必要があります。

これは RabbitMQ で非常に簡単に実行できます。

  • プロデューサは 3 つのキューを宣言します。X、Y、Z (これらは、ルーティング キーとして名前を使用してデフォルトの交換局に自動的にバインドされます。これはまさに私たちが望んでいることです)、
  • メッセージをパブリッシュするとき、プロデューサはまず関連するキューをパージします (つまり、X メッセージをパブリッシュする場合は、最初に X キューをパージします)。これにより、古いメッセージが効果的に削除されます。
  • コンシューマは、必要なキューから単に消費します (X メッセージには X、Y メッセージには Y など)。その観点から見ると、次の関連メッセージを取得するには、basic.get を実行するだけです。

これは、2 つのプロデューサーがほぼ同時に同じ種類のメッセージを送信した場合に競合状態が発生することを意味します。その結果、キューに同時に 2 つ (またはそれ以上) のメッセージが存在する可能性がありますが、メッセージの数はプロデューサーの数によって上限が決められており、余分なメッセージは次回のパブリッシュ時にパージされるためです。 、これはそれほど問題ではないはずです。

要約すると、このソリューションには、最適なソリューションから追加の手順が 1 つだけあります。つまり、タイプ X のメッセージを公開する前にキュー X をパージするということです。

この構成のセットアップにサポートが必要な場合は、rabbitmq-discuss メーリング リストがアドバイスを求めるのに最適な場所です。

他のヒント

必要なのはメッセージ キューではなく、キーと値のデータベースです。たとえば、Redis または Tokyo Tyrant を使用して、ネットワークからアクセスできる単純なキーと値のデータベースを取得できます。または、単に memcache を使用します。

各メッセージ タイプはキーです。同じキーを使用して新しいメッセージを書き込むと、以前の値が上書きされるため、このデータベースの読み取り者は古い情報を取得できなくなります。

この時点で必要なのは、重要な場合にキーを読み取る順序を確立するためのメッセージ キューだけです。それ以外の場合は、データベースを継続的にスキャンしてください。データベースを継続的にスキャンする場合は、ネットワーク トラフィックを減らすためにデータベースをリーダーの近くに置くことが最善です。

私はおそらくこのようなことをするでしょうkey: typecode value: lastUpdated, important data

次に、次の内容のメッセージを送信します。typecode, lastUpdated そうすることで、リーダーはそのキーの lastupdated とデータベースから最後に読み取ったキーを比較し、すでに最新であるためそのキーの読み取りをスキップできます。

AMQP を使用してこれを行う必要がある場合は、RabbitMQ とカスタム交換タイプ、具体的には Last Value Cache Exchange を使用してください。サンプルコードはこちら https://github.com/squaremo/rabbitmq-lvc-plugin

最初の n 個のメッセージをキューから削除したいだけの場合は、RabbitMQ Web-UI からも機能するようです

  • 「キュー」タブからキューを選択し、「メッセージの取得」セクションまで下にスクロールします。
  • パラメータ「Requeue=No」とキューから削除するメッセージの数を設定します
  • 「メッセージを取得」ボタンを押してください

この質問は、タイトルのせいで注目度が高いです。より具体的なシナリオについて説明します。したがって、キューから次の (FIFO を思い出してください) メッセージを実際に削除したいユーザーの場合は、rabbitmqadmin を利用して以下のコマンドを発行できます。

rabbitmqadmin get queue=queuename requeue=false count=1

このコマンドは基本的にメッセージを消費し、何も行いません。メッセージのバックアップを作成するためのフラグを含む完全なコマンドは、次のようになります。要件に応じて他のパラメータを必ず追加してください。

sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top