Question

I have a long_task function which runs a heavy cpu-bound calculation and I want to make it asynchronous by using the new asyncio framework. The resulting long_task_async function uses a ProcessPoolExecutor to offload work to a different process to not be constrained by the GIL.

The trouble is that for some reason the concurrent.futures.Future instance returned from ProcessPoolExecutor.submit when yielded from throws a TypeError. Is this by design? Are those futures not compatible with asyncio.Future class? What would be a workaround?

I also noticed that generators are not picklable so submitting a couroutine to the ProcessPoolExecutor is going to fail. Is there any clean solution to this as well?

import asyncio
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def long_task():
    yield from asyncio.sleep(4)
    return "completed"

@asyncio.coroutine
def long_task_async():
    with ProcessPoolExecutor(1) as ex:
        return (yield from ex.submit(long_task)) #TypeError: 'Future' object is not iterable
                                                 # long_task is a generator, can't be pickled

loop = asyncio.get_event_loop()

@asyncio.coroutine
def main():
    n = yield from long_task_async()
    print( n )

loop.run_until_complete(main())
Was it helpful?

Solution

You want to use loop.run_in_executor, which uses a concurrent.futures executor, but maps the return value to an asyncio future.

The original asyncio PEP suggests that concurrent.futures.Future may someday grow a __iter__ method so it can be used with yield from as well, but for now the library has been designed to only require yield from support and nothing more. (Otherwise some code wouldn't actually work in 3.3.)

OTHER TIPS

We can wrap a concurrent.futures.Future into an asyncio.future by calling asyncio.wrap_future(Future). I tried it with the below code. Works fine

from asyncio import coroutine
import asyncio
from concurrent import futures


def do_something():
    ls = []
    for i in range(1, 1000000):
        if i % 133333 == 0:
            ls.append(i)
    return ls


@coroutine
def method():
    with futures.ProcessPoolExecutor(max_workers=10) as executor:
        job = executor.submit(do_something)
        return (yield from asyncio.wrap_future(job))

@coroutine
def call_method():
    result = yield from method()
    print(result)


def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(call_method())
    finally:
        loop.close()


if __name__ == '__main__':
    main()
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top