Comment les messages de suppression sélective à partir d'une file d'attente d'AMQP (RabbitMQ)?

StackOverflow https://stackoverflow.com/questions/3434763

Question

Je voudrais des messages de suppression sélective à partir d'une file d'attente d'AMQP sans même les lire.

Le scénario est le suivant:

L'envoi côté veut expirer les messages de type X basé sur un fait que de nouvelles informations de type X est arrivé. Parce qu'il est très probable que l'abonné n'a pas consommé encore le dernier message de type X, l'éditeur devrait simplement supprimer les messages précédents de type X et de mettre un plus nouveau dans la file d'attente. L'opération devrait être transparente pour l'abonné -. En fait, il devrait utiliser quelque chose d'aussi simple que STOMP pour obtenir les messages

Comment faire en utilisant AMQP? Ou peut-être plus pratique dans un autre protocole de messagerie?

Je voudrais éviter une infrastructure compliquée. La messagerie tout nécessaire est aussi simple que ci-dessus:. Une file d'attente, un abonné, un éditeur, mais l'éditeur doit avoir une capacité à ad-hoc pour la suppression des messages à un critère donné

Le client de l'éditeur utilisera Ruby mais en fait je voudrais traiter de toute langue dès que je découvre comment le faire dans le protocole.

Était-ce utile?

La solution

Vous ne pouvez actuellement faire en RabbitMQ (ou plus généralement, dans AMQP) automatiquement. Mais, voici une solution facile.

Le mot Let vous voulez envoyer trois types de messages: Xs, Ys et Zs. Si je comprends bien votre question, lorsqu'un message X arrive, vous voulez que le courtier oublier tous les autres messages X qui n'ont pas été livrés.

Ceci est assez facile à faire dans RabbitMQ:

  • le producteur déclare trois files d'attente: X, Y et Z (ils sont automatiquement liés à l'échange par défaut avec leurs noms comme clés de routage, ce qui est exactement ce que nous voulons),
  • lors de la publication d'un message, le producteur première purge la file d'attente concernée (ainsi, si elle est la publication d'un message de X, il purge le premier file d'attente de X); Ceci supprime les messages obsolètes,
  • le consommateur consomme simplement de la file d'attente, il veut (X pour les messages X, Y pour les messages Y, etc.); de son point de vue, il suffit de faire un basic.get pour obtenir le prochain message pertinent.

Cela implique une condition de course lorsque deux producteurs envoient le même type de message au sujet du même temps. Le résultat est que son possible pour la file d'attente d'avoir deux messages en même temps (ou plus), mais étant donné que le nombre de messages est supérieur délimité par le nombre de producteurs, et que les messages superflus sont purgées sur la prochaine publication , cela ne devrait pas être un gros problème.

Pour résumer, cette solution a plus qu'à une étape supplémentaire de la file d'attente solution optimale, à savoir X purge avant de publier un message de type X.

Si vous avez besoin d'aide pour configurer cette configuration, l'endroit idéal pour demander des conseils est la liste de diffusion rabbitmq-discuss.

Autres conseils

Vous ne voulez pas une file d'attente de messages, vous voulez une base de données de valeur clé. Par exemple, vous pouvez utiliser Redis ou Tokyo Tyrant pour obtenir une base de données simple réseau accessible valeur clé. Ou tout simplement utiliser un memcache.

Chaque type de message est une clé. Lorsque vous écrivez un nouveau message avec la même clé, elle écrasera la valeur précédente de sorte que le lecteur de cette base de données ne sera jamais en mesure de sortir des informations à jour.

À ce stade, vous avez seulement besoin d'une file d'attente de messages pour établir l'ordre dans lequel les clés doivent être lus, si cela est important. Sinon, la base de données continuellement scanner. Si vous ne numériser en permanence la base de données, il est préférable de mettre la base de données auprès des lecteurs pour réduire le trafic réseau.

Je probablement faire quelque chose comme ça key: typecode value: lastUpdated, important data

Alors j'envoyer des messages contenant typecode, lastUpdated De cette façon, le lecteur peut comparer LastUpdated pour cette clé à celle qu'ils dernière lecture de la base de données et sauter la lecture parce qu'ils sont déjà à jour.

Si vous avez vraiment besoin de le faire avec AMQP, puis utilisez RabbitMQ et un type d'échange personnalisé, spécifiquement une dernière valeur cache Exchange. Exemple de code est ici https://github.com/squaremo/rabbitmq-lvc-plugin

Il semble fonctionner aussi du RabbitMQ Web UI, si vous voulez juste supprimer premiers messages n de la file d'attente

  • sélectionnez la file d'attente onglet « Files d'attente », faites défiler jusqu'à la section « Obtenez des messages »
  • le jeu de paramètres « requeue = Non » et le nombre de messages que vous souhaitez supprimer de la file d'attente
  • appuyez sur le bouton "Get messages"

Cette question a une grande visibilité en raison du titre de celui-ci. En passant par la description demeure avec scénario plus spécifique. Donc, pour les utilisateurs qui cherchent à supprimer en fait le message suivant (souvenez-vous FIFO) de la file d'attente, vous pouvez utiliser rabbitmqadmin et exécutez la commande ci-dessous:

rabbitmqadmin get queue=queuename requeue=false count=1

Cette commande est essentiellement le message consume et ne rien faire. Une commande complète avec le drapeau de prendre la sauvegarde du message (s) pourrait ressembler le ci-dessous un. Assurez-vous d'ajouter d'autres paramètres selon vos besoins.

sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top