سؤال

The setup:

I am in the process of changing the way a program works under the hood. In the current version works like this:

public void threadWork( List<MyCallable> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst);
    List<Future<myOutput>> returnValues = new ArrayList<Future<myOutput>>();
    List<myOutput> finishedStuff = new ArrayList<myOutput>();

    for( int i = 0; i < workQueue.size(); i++ )
    {
        returnValues.add( pool.submit( workQueue.get(i) ) );
    }

    while( !returnValues.isEmpty() )
    {
        try
        {
            // Future.get() waits for a value from the callable
            finishedStuff.add( returnValues.remove(0).get(0) );
        }
        catch(Throwable iknowthisisbaditisjustanexample){}
    }

    doLotsOfThings(finsihedStuff);
}

But the new system is going to use a private inner Runnable to call a synchronized method that writes the data into a global variable. My basic setup is:

public void threadReports( List<String> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst); 
    List<MyRunnable> runnables = new ArrayList<MyRunnable>()

    for ( int i = 0; i < modules.size(); i++ )
    {
        runnables.add( new MyRunnable( workQueue.get(i) );
        pool.submit(threads.get(i));
    }

    while( !runnables.isEmpty() )
    {
        try 
        {
            runnables.remove(0).wait(); // I realized that this wouldn't work
        } 
        catch(Throwable iknowthisisbaditisjustanexample){}
    }
    doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}

If you read my comment in the try of the second piece of code you will notice that I don't know how to use wait(). I had thought it was basically like thread.join() but after reading the documentation I see it is not.

I'm okay with changing some structure as needed, but the basic system of taking work, using runnables, having the runnables write to a global variable, and using a threadpool are requirements.


The Question

How can I wait for the threadpool to be completely finished before I doLotsOfThings()?

هل كانت مفيدة؟

المحلول

You should call ExecutorService.shutdown() and then ExecutorService.awaitTermination.

...
pool.shutdown();
if (pool.awaitTermination(<long>,<TimeUnit>)) {
    // finished before timeout
    doLotsOfThings(finsihedStuff);
} else {
    // Timeout occured.
}

نصائح أخرى

Try this:

pool.shutdown();
pool.awaitTermination(WHATEVER_TIMEOUT, TimeUnit.SECONDS);

Have you considered using the Fork/Join framework that is now included in Java 7. If you do not want to use Java 7 yet you can get the jar for it here.

public void threadReports( List<String> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst); 
    Set<Future<?>> futures = new HashSet<Future<?>>();

    for ( int i = 0; i < modules.size(); i++ )
    {
        futures.add(pool.submit(threads.get(i)));
    }

    while( !futures.isEmpty() )
    {
        Set<Future<?>> removed = new Set<Future<?>>();
        for(Future<?> f : futures) {
            f.get(100, TimeUnit.MILLISECONDS);
            if(f.isDone()) removed.add(f);
        }
        for(Future<?> f : removed) futures.remove(f);
    }
    doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}

shutdownis a lifecycle method of the ExecutorService and renders the executor unusable after the call. Creating and destroying ThreadPools in a method is as bad as creating/destroying threads: it pretty much defeats the purpose of using threadpool, which is to reduce the overhead of thread creation by enabling transparent reuse.

If possible, you should maintain your ExecutorService lifecycle in sync with your application. - create when first needed, shutdown when your app is closing down.

To achieve your goal of executing a bunch of tasks and waiting for them, the ExecutorService provides the method invokeAll(Collection<? extends Callable<T>> tasks) (and the version with timeout if you want to wait a specific period of time.)

Using this method and some of the points mentioned above, the code in question becomes:

public void threadReports( List<String> workQueue ) {

    List<MyRunnable> runnables = new ArrayList<MyRunnable>(workQueue.size());
    for (String work:workQueue) {
        runnables.add(new MyRunnable(work));
    }
    // Executor is obtained from some applicationContext that takes care of lifecycle mgnt
    // invokeAll(...) will block and return when all callables are executed 
    List<Future<MyRunnable>> results = applicationContext.getExecutor().invokeAll(runnables); 

    // I wouldn't use a global variable unless you have a VERY GOOD reason for that.  
    // b/c all the threads of the pool doing work will be contending for the lock on that variable.
    // doLotsOfThings(finishedStuff);  

    // Note that the List of Futures holds the individual results of each execution. 
    // That said, the preferred way to harvest your results would be:
    doLotsOfThings(results);
}

PS: Not sure why threadReports is void. It could/should return the calculation of doLotsOfThings to achieve a more functional design.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top