Kombu-python - force blocking/synchronous behavior (or processing a message only when the previous finished)

StackOverflow https://stackoverflow.com/questions/22118978

  •  18-10-2022
  •  | 
  •  

Question

I have Kombu processing a rabbitmq queue and calling django functions/management commands etc. My problem is that I have an absolute requirement for correct order of execution. tha handler for message 3 can never run before the handler for message1 and 2 is finished. I need to ensure Kombu doesn't process another message before I finish processing the previous one:

Consider this base class

class UpdaterMixin(object):
    #  binding management commands to event names
    #  override in subclass
    event_handlers = {}
    app_name = ''   #override in subclass

    def __init__(self):
        if not self.app_name or len(self.event_handlers) == 0:
            print('app_name or event_handlers arent implemented')
            raise NotImplementedError()
        else:
            self.connection_url = settings.BROKER_URL
            self.exchange_name = settings.BUS_SETTINGS['exchange_name']
            self.exchange_type = settings.BUS_SETTINGS['exchange_type']
            self.routing_key = settings.ROUTING_KEYS[self.app_name]

    def start_listener(self):
        logger.info('started %s updater listener' % self.app_name)\\
        with Connection(self.connection_url) as connection:
            exchange = Exchange(self.exchange_name, self.exchange_type, durable=True)
            queue = Queue('%s_updater' % self.app_name, exchange=exchange, routing_key=self.routing_key)
            with connection.Consumer(queue, callbacks=[self.process_message]) as consumer:
                while True:
                    logger.info('Consuming events')
                    connection.drain_events()

    def process_message(self, body, message):


        logger.info('data received: %s' % body)
        handler = self.event_handlers[body['event']]
        logger.info('Executing management command: %s' % str(handler))
        data = json.dumps(body)
        call_command(handler, data, verbosity=3, interactive=False)
        message.ack()

Is there a way to force kombu for this kind of behavior? I don't care if the lock would be in not draining another event until processing is done or not running another process_message until the previous is finished, or any other method to acheive this. I just need to make sure execution order is strictly maintained.

I'll be glad for any help with this.

Was it helpful?

Solution

Just figured out the since python is single threaded by default, then this code is blocking/synchronous by default unless I explicitly rewrite it to be async. If anyone bumps into this

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top