Kombu-python - force blocking/synchronous behavior (or processing a message only when the previous finished)

StackOverflow https://stackoverflow.com/questions/22118978

  •  18-10-2022
  •  | 
  •  

Domanda

I have Kombu processing a rabbitmq queue and calling django functions/management commands etc. My problem is that I have an absolute requirement for correct order of execution. tha handler for message 3 can never run before the handler for message1 and 2 is finished. I need to ensure Kombu doesn't process another message before I finish processing the previous one:

Consider this base class

class UpdaterMixin(object):
    #  binding management commands to event names
    #  override in subclass
    event_handlers = {}
    app_name = ''   #override in subclass

    def __init__(self):
        if not self.app_name or len(self.event_handlers) == 0:
            print('app_name or event_handlers arent implemented')
            raise NotImplementedError()
        else:
            self.connection_url = settings.BROKER_URL
            self.exchange_name = settings.BUS_SETTINGS['exchange_name']
            self.exchange_type = settings.BUS_SETTINGS['exchange_type']
            self.routing_key = settings.ROUTING_KEYS[self.app_name]

    def start_listener(self):
        logger.info('started %s updater listener' % self.app_name)\\
        with Connection(self.connection_url) as connection:
            exchange = Exchange(self.exchange_name, self.exchange_type, durable=True)
            queue = Queue('%s_updater' % self.app_name, exchange=exchange, routing_key=self.routing_key)
            with connection.Consumer(queue, callbacks=[self.process_message]) as consumer:
                while True:
                    logger.info('Consuming events')
                    connection.drain_events()

    def process_message(self, body, message):


        logger.info('data received: %s' % body)
        handler = self.event_handlers[body['event']]
        logger.info('Executing management command: %s' % str(handler))
        data = json.dumps(body)
        call_command(handler, data, verbosity=3, interactive=False)
        message.ack()

Is there a way to force kombu for this kind of behavior? I don't care if the lock would be in not draining another event until processing is done or not running another process_message until the previous is finished, or any other method to acheive this. I just need to make sure execution order is strictly maintained.

I'll be glad for any help with this.

È stato utile?

Soluzione

Just figured out the since python is single threaded by default, then this code is blocking/synchronous by default unless I explicitly rewrite it to be async. If anyone bumps into this

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top