Вопрос

Я хотел бы выборочно удалить сообщения из очереди AMQP, даже не читая их.

Сценарий выглядит следующим образом:

Отправка сторонна хочет истекать сообщения типа X на основе факта, что новая информация о типе X приехала. Поскольку очень вероятно, что подписчик не потребляет последнее сообщение о типе X, Publisher должен просто удалить предыдущие сообщения X-типа и поставить новейшую в очередь. Вся операция должна быть прозрачной для абонента - на самом деле он должен использовать что-то так просто, как STOP, чтобы получить сообщения.

Как это сделать с помощью AMQP? Или, может быть, это удобнее в другом протоколе обмена сообщениями?

Я хотел бы избежать сложной инфраструктуры. Все необходимые обмен сообщениями так же просто, как указано выше: одна очередь, один подписчик, один издатель, но издатель должен иметь возможность AD-HOC удалять сообщения для заданных критериев.

Клиент издателя будет использовать Ruby, но на самом деле я буду иметь дело с любым языком, как только я узнаю, как это сделать в протоколе.

Это было полезно?

Решение

В настоящее время вы не можете сделать это в Rabbitmq (или в целом, в AMQP) автоматически. Но вот легкий обходной путь.

Допустим, вы хотите отправить три типа сообщений: XS, YS и ZS. Если я правильно понимаю ваш вопрос, когда приходит сообщение X, вы хотите, чтобы брокер забыл все остальные сообщения X, которые не были доставлены.

Это довольно легко сделать в rabbitmq:

  • Производитель объявляет три очередя: X, Y и Z (они автоматически связаны с обменом по умолчанию с их именами в качестве клавиш маршрутизации, что именно то, что мы хотим),
  • При публикации сообщения производитель сначала очищает соответствующую очередь (так, если она публикует сообщение X, сначала очищает очередь X); Это эффективно удаляет устаревшие сообщения,
  • Потребитель просто потребляет от очереди, которую он хочет (х для сообщений x, y для сообщений Y и т. Д.); С его точки зрения, он просто должен сделать Basic. Получить следующее соответствующее сообщение.

Это подразумевает состояние гонки, когда два производителя отправляют одно и то же тип сообщения при примерно в то же время. Результатом является то, что его возможно для очереди иметь два (или более) сообщения одновременно, но поскольку количество сообщений верхнее ограничено числом производителей, а поскольку лишние сообщения очищены на следующем публике , это не должно быть большим из проблем.

Подводя итоги, это решение имеет только один дополнительный шаг от оптимального решения, а именно в очередь keue X перед публикацией сообщения типа X.

Если вам нужна помощь в настройке этой конфигурации, идеальное место для получения совета - это список рассылки Rabbitmq.

Другие советы

Вы не хотите, чтобы очередь сообщений вам нужна база данных ключа. Например, вы можете использовать Redis или Tokyo Tyrant, чтобы получить простую доступную ключевую базу данных ключей. Или просто используйте Memcache.

Каждый тип сообщения является ключом. Когда вы пишете новое сообщение с тем же ключом, он перезаписывает предыдущее значение, поэтому читатель этой базы данных никогда не сможет устареть информацию.

На данный момент вам нужна только очередь сообщений, чтобы установить заказ, в котором следует прочитать ключи, если это важно. В противном случае просто постоянно сканируйте базу данных. Если вы постоянно сканируете базу данных, лучше всего поставить базу данных рядом с читателями, чтобы уменьшить сетевой трафик.

Я бы, вероятно, сделал что-то вроде этогоkey: typecode value: lastUpdated, important data

Тогда я бы отправил сообщения, которые содержатtypecode, lastUpdated Таким образом, читатель может сравнить Fastupdated для этого ключа к тому, которое они в последний раз прочитаны из базы данных и пропускают его, потому что они уже в курсе.

Если вам действительно нужно сделать это с AMQP, затем используйте RABBITMQ и пользовательский тип обмена, в частности, последнюю ценность кэша обмена. Пример код здесь https://github.com/squaremo/rabbitmq-lvc-plugin.

Похоже, работает также от Web-ui rabbitmq, если вы просто хотите удалить первые N сообщений из очереди

  • Выберите очередь из вкладки «Очески», прокрутите вниз до раздела «Получить сообщения»
  • Установите параметр «requeue = NO» и количество сообщений, которые вы хотите удалить из очереди
  • Нажмите кнопку «Получить сообщения»

Этот вопрос имеет высокую видимость из-за этого название этого. Проходя описание останавливается с более конкретным сценарием. Поэтому для тех пользователей, которые стремятся на самом деле удалить следующую (запомнить FIFO) сообщение из очереди, вы можете использовать rabbitmqadmin и выдать команду ниже:

rabbitmqadmin get queue=queuename requeue=false count=1

Эта команда по существу потребляет сообщение и ничего не делает. Полная команда с флагом для выполнения резервного копирования сообщения (ах) может выглядеть следующим образом. Обязательно добавьте любые другие параметры согласно вашему требованию.

sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top