Question

Is there a behavior/pattern in tornado (or asyncio) to wait for any instead of all Futures in a list ?

yield any_of([future1, future2, future3])

Say future2 is ready then the result should be:

[None, <result>, None]
Was it helpful?

Solution

Update: Tornado now has tornado.gen.WaitIterator, use it according to the example in its documentation, rather than my idea below.

You could make an Any class that inherits from Future, and wraps a list of futures. The Any class waits until one of its futures resolves, then gives you the list of results:

import time
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.concurrent import Future


@gen.coroutine
def delayed_msg(seconds, msg):
    yield gen.Task(IOLoop.current().add_timeout,
                   time.time() + seconds)
    raise gen.Return(msg)


class Any(Future):
    def __init__(self, futures):
        super(Any, self).__init__()
        self.futures = futures
        for future in futures:
            future.add_done_callback(self.done_callback)

    def done_callback(self, future):
        try:
            self.set_result(self.make_result())
        except Exception as e:
            self.set_exception(e)

    def make_result(self):
        """A list of results: None for each pending future, a result for
        each resolved future. Raises an exception for the first future
        that has an exception.
        """
        return [f.result() if f.done() else None
                for f in self.futures]

    def clear(self):
        """Break reference cycle with any pending futures."""
        self.futures = None


@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_msg(3, '3')
    future3 = delayed_msg(1, '1')
    results = yield Any([future1, future2, future3])
    end = time.time()
    print "finished in %.1f sec: %r" % (end - start, results)

    results = yield Any([future1, future2])
    end = time.time()
    print "finished in %.1f sec: %r" % (end - start, results)

IOLoop.current().run_sync(f)

As expected, this prints:

finished in 1.0 sec: [None, None, '1']
finished in 2.0 sec: ['2', None]

But you can see there are some complications with this approach. For one thing, if you want to wait for the rest of the futures after the first one resolves, it's complicated to construct the list of still-pending futures. I suppose you could do:

results = yield Any(f for f in [future1, future2, future3] if not f.done())

Not pretty, and not even correct! There's a race condition. If a future is resolved between consecutive executions of yield Any(...), then you'll never receive its result. The first yield doesn't get the future's result because it's still pending, but the second yield doesn't get its result, either, because by that point the future is "done" and it isn't included in the list passed to Any.

Another complication is that Any refers to each future, which refers to a callback which refers back to the Any. For prompt garbage collection, you should call Any.clear().

Additionally, you can't distinguish between a pending future, and a future that resolved to None. You'd need a special sentinel value distinct from None to represent a pending future.

The final complication is the worst. If multiple futures are resolved and some of them have exceptions, there's no obvious way for Any to communicate all that information to you. Mixing exceptions and results in a list would be perverse.

I think there's a simpler way. We can make Any return just the first future that resolves, instead of a list of results:

class Any(Future):
    def __init__(self, futures):
        super(Any, self).__init__()
        for future in futures:
            future.add_done_callback(self.done_callback)

    def done_callback(self, future):
        self.set_result(future)

The reference cycle is gone, and the exception-handling question is answered: The Any class returns the whole future to you, instead of its result or exception. You can inspect it as you like. It's also easy to wait for the remaining futures after some are resolved:

@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_msg(3, '3')
    future3 = delayed_msg(1, '1')

    futures = set([future1, future2, future3])
    while futures:
        resolved = yield Any(futures)
        end = time.time()
        print "finished in %.1f sec: %r" % (end - start, resolved.result())
        futures.remove(resolved)

As desired, this prints:

finished in 1.0 sec: '1'
finished in 2.0 sec: '2'
finished in 3.0 sec: '3'

We can test the exception-handling behavior by adding a new global function:

@gen.coroutine
def delayed_exc(seconds, msg):
    yield gen.Task(IOLoop.current().add_timeout,
                   time.time() + seconds)
    raise Exception(msg)

And yielding it instead of delayed_msg:

@gen.coroutine
def f():
    start = time.time()
    future1 = delayed_msg(2, '2')
    future2 = delayed_exc(3, '3')  # Exception!
    future3 = delayed_msg(1, '1')

    futures = set([future1, future2, future3])
    while futures:
        resolved = yield Any(futures)
        end = time.time()
        try:
            outcome = resolved.result()
        except Exception as e:
            outcome = e

        print "finished in %.1f sec: %r" % (end - start, outcome)
        futures.remove(resolved)

Now, the script will print "1", then "2", then "Exception('3',)".

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top