我想有选择地从 AMQP 队列中删除消息,甚至不读取它们。

场景如下:

发送方希望基于类型X的新信息到达的事实来使类型X的消息过期。因为订阅者很可能还没有消费最新的 X 类型消息,所以发布者应该删除以前的 X 类型消息并将最新的消息放入队列中。整个操作对于订阅者来说应该是透明的 - 事实上他应该使用像 STOMP 这样简单的东西来获取消息。

如何使用 AMQP 做到这一点?或者也许在其他消息传递协议中更方便?

我想避免复杂的基础设施。所需的整个消息传递就像上面一样简单:一个队列、一个订阅者、一个发布者,但发布者必须能够根据给定条件临时删除消息。

发布者客户端将使用 Ruby,但实际上,只要我发现如何在协议中执行此操作,我就会处理任何语言。

有帮助吗?

解决方案

目前,您无法在 RabbitMQ(或更一般地说,在 AMQP 中)自动执行此操作。但是,这里有一个简单的解决方法。

假设您想发送三种类型的消息:X、Y 和 Z。如果我正确理解你的问题,当 X 消息到达时,你希望代理忘记所有其他尚未传递的 X 消息。

这在 RabbitMQ 中相当容易做到:

  • 生产者声明三个队列:X、Y 和 Z(它们自动绑定到默认交换机,其名称作为路由键,这正是我们想要的),
  • 当发布消息时,生产者首先清除相关队列(因此​​,如果它发布 X 消息,它首先清除 X 队列);这有效地删除了过时的消息,
  • 消费者只需从它想要的队列中消费(X代表X消息,Y代表Y消息,等等);从它的角度来看,它只需要执行 basic.get 即可获取下一条相关消息。

这意味着当两个生产者几乎同时发送相同类型的消息时出现竞争条件。结果是队列可能同时有两条(或更多)消息,但由于消息数量受到生产者数量的上限,并且多余的消息在下一次发布时被清除,这应该不是什么大问题。

总而言之,该解决方案仅比最优解决方案多了一步,即在发布类型 X 的消息之前清除队列 X。

如果您在设置此配置时需要任何帮助,寻求建议的最佳地点是rabbitmq-discuss 邮件列表。

其他提示

您不希望消息队列,你想有一个键值数据库。例如,你可以使用Redis的或东京暴君得到一个简单的网络访问的键值数据库。或只使用一个内存缓存。

每个消息类型是一个关键。当你写使用相同的密钥生成新的消息时,它会覆盖之前的值,这个数据库的读者将永远无法获得的最新信息了。

在这一点上,你只需要一个消息队列建立在键都应该读的顺序,如果是重要的。否则,只是不断地扫描数据库。如果你不断地扫描数据库,最好是把数据库中的读者近,以减少网络流量。

我可能会做这样的事 key: typecode value: lastUpdated, important data

然后,我会发送包含消息 typecode, lastUpdated这样读者可以比较LASTUPDATED为关键,一个他们从数据库最后一次读取并跳过读它,因为它们已经是最新的。

如果您真的需要AMQP要做到这一点,那么使用的RabbitMQ和一个自定义交换型,特别是最后一个值缓存交换。示例代码是在这里 https://github.com/squaremo/rabbitmq-lvc-plugin

如果您只想从队列中删除前 n 条消息,它似乎也可以在 RabbitMQ Web-UI 上工作

  • 从“队列”选项卡中选择队列,向下滚动到“获取消息”部分
  • 设置参数“Requeue=No”以及要从队列中删除的消息数
  • 按“获取消息”按钮

此问题有由于它的标题的高可见性。通过描述要与更具体的方案同居。 因此,对于谁正在寻找真正从队列中删除下一个(记得FIFO)消息的用户,你可以利用rabbitmqadmin并发出以下命令:

rabbitmqadmin get queue=queuename requeue=false count=1

此命令基本上消耗该消息和无所作为。与标志一个完整的命令采取消息(S)的备份可能看起来像下面的一个。确保添加任何其他参数按您的要求。

sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top