Question

I have a program that takes in a list. For each value in this list, it retrieves another list and processes this other list.

Basically, it's a 3-depth tree which need to do possibly expensive processing at each node.

Each nodes needs to be able to process the results of its children.

What I'd like to be able to do is to map from the inputs in the first layer list to the results of each node. In each of these processes though, I would like to map the result from the next layer down.

What I'm worried about is that each layer will have its own number of max workers. I would like them to share a process pool if possible, otherwise there are performance hits for all of the process switching.

Is there a way to, using concurrency.futures or some other method, have each layer share the same process pool?

An example would be:

def main():
    my_list = [1,2,3,4]
    with concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor:
        results = executor.map(my_function, zip(my_list, [executor] * len(my_list)))
        #process results

def my_function(args):
    list = args[0]
    executor = args[1]
    new_list = process(list)
    results = executor.map(second_function, new_list)
    #process results
    #return processed results

def second_function(values):
    ...

In this way, each child process will draw from the same pool.

Or, can I do something like (but not exactly)

import concurrent.futures.ProcessPoolExecutor(max_workers = 4) as executor

and have each call to executor pull from the same process pool?

Was it helpful?

Solution

The problem is that you process pool has 4 threads and you try to wait in maybe 20 threads.. so that there are not enough threads to do what you want.

In other words: the my_function is executed in a worker thread. This thread blocks when map is called. There is one thread less available to exeucte the calls to map. The futures block this thread.

My solution is to use the yield and yield from statements that return futures. So my solution is to remove the blocking of the futures and the thread. All futures are created and then a yield occurs to interrut the currrent execution and free the thread. This thread can then execute the map futures. Onc a future is done the registered callbac executes the next() generator step.

To solve the Proxy problem to exissting objects this question has to be solved first: How to properly set up multiprocessing proxy objects for objects that already exist

So given we have the following recursion to execute: [1,[2,[3,3,3],2],1],0,0] A recursive parallel sum of the lists.

We can expect the following output:

tasks: [[1, [2, [3, 3, 3], 2], 1], 0, 0]
tasks: [1, [2, [3, 3, 3], 2], 1]
tasks: 0
tasks: 0
tasks: 1
tasks: [2, [3, 3, 3], 2]
tasks: 1
tasks: 2
tasks: [3, 3, 3]
tasks: 2
tasks: 3
tasks: 3
tasks: 3
v: 15

This code here introduces a Recursion enabled ThreadPoolExecutor:

import traceback
from concurrent.futures.thread import *
from concurrent.futures import *
from concurrent.futures._base import *
##import hanging_threads

class RecursiveThreadPoolExecutor(ThreadPoolExecutor):

    # updated version here: https://gist.github.com/niccokunzmann/9170072

    def _submit(self, fn, *args, **kwargs):
        return ThreadPoolExecutor.submit(self, fn, *args, **kwargs)

    def submit(self, fn, *args, **kwargs):
        """Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as fn(*args, **kwargs) and returns
        a Future instance representing the execution of the callable.

        Returns:
            A Future representing the given call.
        """
        real_future = Future()
        def generator_start():
            try:
##                print('start', fn, args, kwargs)
                generator = fn(*args, **kwargs)
##                print('generator:', generator)
                def generator_next():
                    try:
##                        print('next')
                        try:
                            future = next(generator)
                        except StopIteration as stop:
                            real_future.set_result(stop.args[0])
                        else:
                            if future is None:
                                self._submit(generator_next)
                            else:
                                future.add_done_callback(lambda future: generator_next())
                    except:
                        traceback.print_exc()
                self._submit(generator_next)
##                print('next submitted 1')
            except:
                traceback.print_exc()
        self._submit(generator_start)
        return real_future

    def recursive_map(self, fn, *iterables, timeout=None):
        """Returns a iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.

        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        """
        if timeout is not None:
            end_time = timeout + time.time()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            yield from fs
            return fs
        return result_iterator()

if __name__ == '__main__':

    def f(args):
        executor, tasks = args
        print ('tasks:', tasks)
        if type(tasks) == int:
            return tasks
        # waiting for all futures without blocking the thread
        futures = yield from executor.recursive_map(f, [(executor, task) for task in tasks]) 
        return sum([future.result() for future in futures])

    with RecursiveThreadPoolExecutor(max_workers = 1) as executor:
        r = executor.map(f, [(executor, [[1,[2,[3,3,3],2],1],0,0],)] * 1)
        import time
        time.sleep(0.1)

        for v in r:
            print('v: {}'.format(v))

An updated version can be found here: https://gist.github.com/niccokunzmann/9170072

Sadly, I am not able to implement this for Processes using some multiprocessing stuff now. You can do it and the only thing that should be necessairy is to create a proxy object to the generator_start and generator_next functions. If you do so, please let me know.

To solve the proxy problem to the methods this question would also be answered: How to properly set up multiprocessing proxy objects for objects that already exist

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top