I had to move away from ForkJoinPool
, it did not use the threads optimally.
While it worked fine for the Loop and Split nodes, it did no longer work if I want to parallelize the leaf nodes where the actual work happens. When I add those as RecursiveTask
, then most threads are idle. The join()
call does not steal work in the leaves for some reason (jdk1.7.0_45). It is waiting. In my case all the work is in the leaves, so using a custom RecursiveTask
subclass for the leaves is worse than just using it for Loop and Split nodes (because it is waiting after part of the work while otherwise it waits after all the work). I don't think I use ForkJoinPool
wrong, if you google you find people with similar problems.
I now did a simple solution: 2 thread pools, 1 fixed size for the actual hard work, and a cached one for all the Loop and Split nodes. I created FakeRecursiveTask
(extend this instead of original) so I did not have to change the code (for Loop and Split). I use HardWork
as base class for the leaves just that it is clear that it is something different, just call doHardWork(work)
.
With this solution all my worker threads are used fully all the time. Since the tree has limited size I should never run out of helper threads. Actually in my case it mostly uses the same number of helper threads as there are worker threads (so 8 in my case).
public class ThreadPool3 {
private static int maxNumWorkerThreads;
private static ExecutorService workerPool = null;
private static ExecutorService helperPool = null;
public static void initThreadPool(int maxNumWorkerThreads_) {
int availProcessors = Runtime.getRuntime().availableProcessors();
if (maxNumWorkerThreads_ <= 0) {
maxNumWorkerThreads_ = availProcessors;
}
maxNumWorkerThreads = maxNumWorkerThreads_;
if (availProcessors != maxNumWorkerThreads) {
System.out.println("WARN: maxNumWorkerThreads (" + maxNumWorkerThreads + ") != availProcessors (" + availProcessors + ")");
}
workerPool = Executors.newFixedThreadPool(maxNumWorkerThreads);
BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>();
helperPool = new ThreadPoolExecutor(0, 4 * maxNumWorkerThreads, 60, TimeUnit.MINUTES, workQueue, Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
public static abstract class HardWork implements Callable<Void> {
@Override
public abstract Void call() throws Exception;
}
public static void doHardWork(List<HardWork> tasks) throws Exception {
workerPool.invokeAll(tasks);
}
/**
* fake ForkJoinPoolInterface:
*
*/
public static abstract class FakeRecursiveTask<T> implements Callable<T> {
private Future<T> resultFuture = null;
/**
* fake interface:
*/
public abstract T compute();
/**
* fake interface:
*/
public T invoke() {
return compute();
}
/**
* fake interface:
*/
public void fork() {
resultFuture = helperPool.submit(this);
}
/**
* fake interface:
*/
public T join() {
try {
return resultFuture.get();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public T call() throws Exception {
return compute();
}
}
public static void shutdownThreadPool() {
if (workerPool != null) {
workerPool.shutdown();
}
if (helperPool != null) {
helperPool.shutdown();
}
}
}