Question

The docs of both eventlet and gevent have several examples on how to asyncronously spawn IO tasks and get the results latter. But so far, all the examples where a value should be returned from the async call,I allways find a blocking call after all the calls to spawn(). Either join(), joinall(), wait(), waitall(). This assumes that calling the functions that use IO is immediate and we can jump right into the point where we are waiting for the results.

But in my case I want to get the jobs from a generator that can be slow and or arbitrarily large or even infinite.

I obviously can't do this

pile = eventlet.GreenPile(pool)
for url in mybiggenerator():
    pile.spawn(fetch_title, url)
titles = '\n'.join(pile)

because mybiggenerator() can take a long time before it is exhausted. So I have to start consuming the results while I am still spawning async calls.

This is probably usually done with resource to queues, but I'm not really sure how. Say I create a queue to hold jobs, push a bunch of jobs from a greenlet called P and pop them from another greenlet C. When in C, if I find that the queue is empty, how do I know if P has pushed every job it had to push or if it is just in the middle of an iteration?

Alternativey,Eventlet allows me to loop through a pile to get the return values, but can I start doing this without having spawn all the jobs I have to spawn? How? This would be a simpler alternative.

Was it helpful?

Solution

You don't need any pool or pile by default. They're just convenient wrappers to implement a particular strategy. First you should get idea how exactly your code must work under all circumstances, that is: when and why you start another greenthread, when and why wait for something.

When you have some answers to these questions and doubt in others, ask away. In the meanwhile, here's a prototype that processes infinite "generator" (actually a queue).

queue = eventlet.queue.Queue(10000)
wait = eventlet.semaphore.CappedSemaphore(1000)


def fetch(url):
  # httplib2.Http().request
  # or requests.get
  # or urllib.urlopen
  # or whatever API you like
  return response


def crawl(url):
  with wait:
    response = fetch(url)
    links = parse(response)
    for url in link:
      queue.put(url)


def spawn_crawl_next():
  try:
    url = queue.get(block=False)
  except eventlet.queue.Empty:
    return False
  # use another CappedSemaphore here to limit number of outstanding connections
  eventlet.spawn(crawl, url)
  return True


def crawler():
  while True:
    if spawn_crawl_next():
      continue

    while wait.balance != 0:
      eventlet.sleep(1)

    # if last spawned `crawl` enqueued more links -- process them
    if not spawn_crawl_next():
      break


def main():
  queue.put('http://initial-url')
  crawler()

OTHER TIPS

Re: "concurrent.futures from Python3 does not really apply to "eventlet or gevent" part."

In fact, eventlet can be combined to deploy the concurrent.futures ThreadPoolExecutor as a GreenThread executor.

See: https://github.com/zopefiend/green-concurrent.futures-with-eventlet/commit/aed3b9f17ac27eeaf8c56210e0c8e4aff2ecbdb5

I had the same problem and it has been super difficult to find any answers.

I think I managed to get something working by having a consumer running on a separate thread and using Event for synchronization. Seems to work fine.

Only caveat is that you have to be careful with monkey-patching. If you monkey-patch threading facilities this will probably not work.

import gevent
import gevent.queue
import threading
import time


q = gevent.queue.JoinableQueue()
queue_not_empty = threading.Event()


def run_task(task):
    print(f"Started task {task} @ {time.time()}")
    # Use whatever has been monkey-patched with gevent here
    gevent.sleep(1)
    print(f"Finished task {task} @ {time.time()}")


def consumer():
    while True:
        print("Waiting for item in queue")
        queue_not_empty.wait()
        try:
            task = q.get()
            print(f"Dequed task {task} for consumption @ {time.time()}")
        except gevent.exceptions.LoopExit:
            queue_not_empty.clear()
            continue
        try:
            gevent.spawn(run_task, task)
        finally:
            q.task_done()
        gevent.sleep(0)  # Kickstart task


def enqueue(item):
    q.put(item)
    queue_not_empty.set()


# Run consumer on separate thread
consumer_thread = threading.Thread(target=consumer, daemon=True)
consumer_thread.start()

# Add some tasks
for i in range(5):
    enqueue(i)
time.sleep(2)

Output:

Waiting for item in queue
Dequed task 0 for consumption @ 1643232632.0220542
Started task 0 @ 1643232632.0222237
Waiting for item in queue
Dequed task 1 for consumption @ 1643232632.0222733
Started task 1 @ 1643232632.0222948
Waiting for item in queue
Dequed task 2 for consumption @ 1643232632.022315
Started task 2 @ 1643232632.02233
Waiting for item in queue
Dequed task 3 for consumption @ 1643232632.0223525
Started task 3 @ 1643232632.0223687
Waiting for item in queue
Dequed task 4 for consumption @ 1643232632.022386
Started task 4 @ 1643232632.0224123
Waiting for item in queue
Finished task 0 @ 1643232633.0235817
Finished task 1 @ 1643232633.0236874
Finished task 2 @ 1643232633.0237293
Finished task 3 @ 1643232633.0237558
Finished task 4 @ 1643232633.0237799
Waiting for item in queue

With the new concurrent.futures module in Py3k, I would say (assuming that the processing you want to do is actually something more complex than join):

with concurrent.futures.ThreadPoolExecutor(max_workers=foo) as wp:
    res = [wp.submit(fetchtitle, url) for url in mybiggenerator()]
ans = '\n'.join([a for a in concurrent.futures.as_completed(res)]

This will allow you to start processing results before all of your fetchtitle calls complete. However, it will require you to exhaust mybiggenerator before you continue -- it's not clear how you want to get around this, unless you want to set some max_urls parameter or similar. That would still be something you could do with your original implementation, though.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top