Question

Based on my previously asked question:

check a list constantly and do something if list has items

I implemented this solution:

workq = gevent.queue.Queue()
def workqueue():
while True:
    item = workq.get() 
    if gevent.spawn(startWork,item).value != "Ok":
        workq.put(item)

The Idea behind this is that all queue Items get spawned to a worker greenlet as they occur so that startwork (takes 1-10 min) runs on all Items from the Queue concurrently. The Problem is that the return value is immediately checked and is always none. I can prevent this when i put a join before the value check but then I can only proceed one item from the queue at the same time because I always have to wait for the result. So my question is there a way to spawn the greenlets as they occur and when they individually are done check for the result and if its not ok put them back to the queue to proceed again.

Was it helpful?

Solution

So my question is there a way to spawn the greenlets as they occur and when they individually are done check for the result and if its not ok put them back to the queue to proceed again.

What you're looking for is a way to join a whole batch of workers. You can do this with a Group or Pool of workers.

But you want to get the values back as they occur, not all at once when it's done. For that, you want something like a future. Unlike threads and processes, as far as I know, greenlets haven't yet been given a full future/executor abstraction. Greenlets themselves are sort of like futures, but there is nothing like wait(iterable_of_futures, FIRST_COMPLETED) to call on them.

Alternatively, you can just add an "output queue", make each task push its results on the output queue, and then just pull all the values off the output queue until you're done. But how do you wait on two queues at once? The obvious way to do it is with two greenlets.

Or, simplest of all, just wrap the task so it re-appends the item at the end:

def workqueue():
    def wrappedWork(item):
        value = startWork(item)
        if value != "Ok":
            workq.put(item)
    while True:
        item = workq.get()
        gevent.spawn(wrappedWork, item)
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top