Question

J'utilise PIKA pour traiter les données de RabbitMQ. Comme j'ai semblé rencontrer différents types de problèmes, j'ai décidé d'écrire une petite application de test pour voir comment je peux gérer les déconnexions.

J'ai écrit cette application de test qui suit:

  1. Connectez-vous au courtier, réessayez jusqu'à succès
  2. Lorsqu'il est connecté, créez une file d'attente.
  3. Consommez cette file d'attente et mettez le résultat dans une file d'attente Python.
  4. Obtenez l'article de la queue.queue (0) et produisez-le dans la file d'attente du courtier.

Ce que j'ai remarqué, ce sont 2 numéros:

  1. Lorsque j'exécute mon script à partir d'un hôte se connectant à RabbitMQ sur un autre hôte (à l'intérieur d'une machine virtuelle), ces scripts sort sur des moments aléatoires sans produire d'erreur.
  2. Lorsque j'exécute mon script sur le même hôte sur lequel Rabbitmq est installé, il fonctionne bien et continue de fonctionner.

Cela peut être expliqué en raison des problèmes de réseau, les paquets ont chuté, bien que je trouve la connexion pas vraiment robuste.

Lorsque le script s'exécute localement sur le serveur RabbitMQ et que je tue le RabbitMQ, le script sort avec erreur: "Erreur Pika SelectConnection: Erreur de socket sur 3: 104"

Il semble donc que je ne puisse pas faire fonctionner la stratégie de reconnexion comme il se doit. Quelqu'un pourrait-il jeter un œil au code pour voir ce que je fais de mal?

Merci,

Geai

#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock

class Broker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.logging = logging.getLogger(__name__)
        self.to_broker = Queue.Queue(0)
        self.from_broker = Queue.Queue(0)
        self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
        self.srs = SimpleReconnectionStrategy()
        self.properties = pika.BasicProperties(delivery_mode=2)

        self.connection = None
        while True:
            try:
                self.connection = SelectConnection(self.parameters, self.on_connected,  reconnection_strategy=self.srs)
                break
            except Exception as err:
                self.logging.warning('Cant connect. Reason: %s' % err)
                time.sleep(1)

        self.daemon=True
    def run(self):
        while True:
            self.submitData(self.from_broker.get(block=True))
        pass
    def on_connected(self,connection):
        connection.channel(self.on_channel_open)
    def on_channel_open(self,new_channel):
        self.channel = new_channel
        self.channel.queue_declare(queue='sandbox', durable=True)
        self.channel.basic_consume(self.processData, queue='sandbox')    
    def processData(self, ch, method, properties, body):
        self.logging.info('Received data from broker')
        self.channel.basic_ack(delivery_tag=method.delivery_tag)
        self.from_broker.put(body)
    def submitData(self,data):
        self.logging.info('Submitting data to broker.')
        self.channel.basic_publish(exchange='',
                    routing_key='sandbox',
                    body=data,
                    properties=self.properties)
if __name__ == '__main__':
    format=('%(asctime)s %(levelname)s %(name)s %(message)s')
    logging.basicConfig(level=logging.DEBUG, format=format)
    broker=Broker()
    broker.start()
    try:
        broker.connection.ioloop.start()
    except Exception as err:
        print err
Était-ce utile?

La solution

Le problème principal avec votre script est qu'il interagit avec un seul canal à partir de votre fil principal (où l'ioloop fonctionne) et le fil "courtier" (appelle submitData dans une boucle). C'est pas sécurisé.

Aussi, SimpleReconnectionStrategy ne semble rien faire d'utile. Il ne provoque pas de reconnexion si la connexion est interrompue. Je crois que c'est un bug dans Pika: https://github.com/pika/pika/issues/120

J'ai essayé de refactorricant votre code pour que cela fonctionne comme je pense que vous le vouliez, mais j'ai rencontré un autre problème. PIKA ne semble pas avoir de moyen de détecter la défaillance de la livraison, ce qui signifie que les données peuvent être perdues si la connexion baisse. Cela semble être une exigence si évidente! Comment peut-il y avoir aucun moyen de détecter cela basic_publish manqué? J'ai essayé toutes sortes de choses, y compris les transactions et add_on_return_callback (qui semblaient tous maladroits et trop compliqués), mais n'ont rien fait. S'il n'y a vraiment aucun moyen, PIKA ne semble être utile que dans des situations qui peuvent tolérer la perte de données envoyées à RabbitMQ, ou dans des programmes qui n'ont besoin de consommer que de RabbitMQ.

Ce n'est pas fiable, mais pour référence, voici un code qui résout votre problème multi-thread:

import logging
import pika
import Queue
import sys
import threading
import time
from functools import partial
from pika.adapters import SelectConnection, BlockingConnection
from pika.exceptions import AMQPConnectionError
from pika.reconnection_strategies import SimpleReconnectionStrategy

log = logging.getLogger(__name__)

DEFAULT_PROPERTIES = pika.BasicProperties(delivery_mode=2)


class Broker(object):

    def __init__(self, parameters, on_channel_open, name='broker'):
        self.parameters = parameters
        self.on_channel_open = on_channel_open
        self.name = name

    def connect(self, forever=False):
        name = self.name
        while True:
            try:
                connection = SelectConnection(
                    self.parameters, self.on_connected)
                log.debug('%s connected', name)
            except Exception:
                if not forever:
                    raise
                log.warning('%s cannot connect', name, exc_info=True)
                time.sleep(10)
                continue

            try:
                connection.ioloop.start()
            finally:
                try:
                    connection.close()
                    connection.ioloop.start() # allow connection to close
                except Exception:
                    pass

            if not forever:
                break

    def on_connected(self, connection):
        connection.channel(self.on_channel_open)


def setup_submitter(channel, data_queue, properties=DEFAULT_PROPERTIES):
    def on_queue_declared(frame):
        # PROBLEM pika does not appear to have a way to detect delivery
        # failure, which means that data could be lost if the connection
        # drops...
        channel.confirm_delivery(on_delivered)
        submit_data()

    def on_delivered(frame):
        if frame.method.NAME in ['Confirm.SelectOk', 'Basic.Ack']:
            log.info('submission confirmed %r', frame)
            # increasing this value seems to cause a higher failure rate
            time.sleep(0)
            submit_data()
        else:
            log.warn('submission failed: %r', frame)
            #data_queue.put(...)

    def submit_data():
        log.info('waiting on data queue')
        data = data_queue.get()
        log.info('got data to submit')
        channel.basic_publish(exchange='',
                    routing_key='sandbox',
                    body=data,
                    properties=properties,
                    mandatory=True)
        log.info('submitted data to broker')

    channel.queue_declare(
        queue='sandbox', durable=True, callback=on_queue_declared)


def blocking_submitter(parameters, data_queue,
        properties=DEFAULT_PROPERTIES):
    while True:
        try:
            connection = BlockingConnection(parameters)
            channel = connection.channel()
            channel.queue_declare(queue='sandbox', durable=True)
        except Exception:
            log.error('connection failure', exc_info=True)
            time.sleep(1)
            continue
        while True:
            log.info('waiting on data queue')
            try:
                data = data_queue.get(timeout=1)
            except Queue.Empty:
                try:
                    connection.process_data_events()
                except AMQPConnectionError:
                    break
                continue
            log.info('got data to submit')
            try:
                channel.basic_publish(exchange='',
                            routing_key='sandbox',
                            body=data,
                            properties=properties,
                            mandatory=True)
            except Exception:
                log.error('submission failed', exc_info=True)
                data_queue.put(data)
                break
            log.info('submitted data to broker')


def setup_receiver(channel, data_queue):
    def process_data(channel, method, properties, body):
        log.info('received data from broker')
        data_queue.put(body)
        channel.basic_ack(delivery_tag=method.delivery_tag)

    def on_queue_declared(frame):
        channel.basic_consume(process_data, queue='sandbox')

    channel.queue_declare(
        queue='sandbox', durable=True, callback=on_queue_declared)


if __name__ == '__main__':
    if len(sys.argv) != 2:
        print 'usage: %s RABBITMQ_HOST' % sys.argv[0]
        sys.exit()

    format=('%(asctime)s %(levelname)s %(name)s %(message)s')
    logging.basicConfig(level=logging.DEBUG, format=format)

    host = sys.argv[1]
    log.info('connecting to host: %s', host)
    parameters = pika.ConnectionParameters(host=host, heartbeat=True)
    data_queue = Queue.Queue(0)
    data_queue.put('message') # prime the pump

    # run submitter in a thread

    setup = partial(setup_submitter, data_queue=data_queue)
    broker = Broker(parameters, setup, 'submitter')
    thread = threading.Thread(target=
         partial(broker.connect, forever=True))

    # uncomment these lines to use the blocking variant of the submitter
    #thread = threading.Thread(target=
    #    partial(blocking_submitter, parameters, data_queue))

    thread.daemon = True
    thread.start()

    # run receiver in main thread
    setup = partial(setup_receiver, data_queue=data_queue)
    broker = Broker(parameters, setup, 'receiver')
    broker.connect(forever=True)
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top