Domanda

This is probably a question about python callbacks as much as using pika. I'm trying to develop some code that subscribes to a queue in RabbitMQ, processes the payload of any delivered message and then write that payload to a series of (disk) files. So using the simple "Hello World" example at http://www.rabbitmq.com/tutorials/tutorial-one-python.html, I've added in logic to the callback function (that is co-incidentally called "callback") to write any received message payloads to a file.

Here's the main problem: I want to write some additional code that, if a certain time period has elapsed, for example 300sec (5 mins), then the process should close the file and create a new one and write any subsequent new messages received to that. And so on ...

BUT - the issue as I see it is that the callback function ONLY gets called when a message arrives in the queue. I think I need some process outside of that callback function that measures elapsed time ....

The rationale is that I want to create a set of disk files (all have unique names based on timestamp) that contain received messages in the MQ queue. If messages are slow in coming, then I close the current open file (so it can be processed further downstream) and open up another.

I also notice that after issuing the start consuming call (channel.start_consuming) then no code under that is reached - why ?

I've played around with python's multiprocessing module but no luck so far.

Here's some skeleton code with pseudo-code comments :-

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

    # want to put code here to write message payloads to a file (unique name)
    # if n secs have elapsed then close the file and create a new file

channel.basic_consume(callback,queue='hello',no_ack=True)

channel.start_consuming()

Thanks !

È stato utile?

Soluzione

It might be worth taking a look at an alternative implementation to Pika. As Pika is blocking by nature, it makes it difficult create something like this. You would essentially need another thread to watch the IO, to see if anything has been written within the last five minutes, else close it.

You could also keep a timestamp, and once you get a new callback if enough time has passed, you can close the file, and create a new file. This would however keep the file open for longer durations, but prevent the data from exceeding five minutes worth.

However, I would recommend that you take a look at Puka instead. It is a non-blocking alternative to Pika that would allow you to easier implement a solution to your problem.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top