Question

How to use exactly 8 threads for the 'expensive' parts all the time?

I have a number crunching problem for which I created a simple framework. My problem is to find an elegant and simple way to use all CPU cores optimally.

To get good performance I use a thread pool with fixed size of 8. The idea is to use as many threads as there are hardware threads for optimal performance.

Simplified pseudo code usage of the framework is as follows:

interface Task {
  data[] compute(data[]);
}

Task task = new Loop(new Chain(new DoX(), new DoY(), new Split(2, new DoZ())));
result = task.compute(data);
  • Loop Task would loop until some termination criteria is met
  • Chain Task would chain tasks (e.g. in the above r = t1.compute(r); r = t2.compute(r); r = t3.compute(r); return r;)
  • Split Task would split the data and execute a task on the parts (e.g. create 2 parts and return new data[] {t1.compute(part1), t1.compute(part2)})

The threading is implemented in the Split Task at the moment. So the Split Task would hand the computation of t1.compute(part1) and t1.compute(part2) to the thread pool.

Approach 1, probably complete deadlock

My first approach was that the Split Task has an array of futures and calls get() on one after another. But that means if the Split Task is inside another Split Task, that the blocking wait in future.get() will block the thread that the outer Split Task took from the thread pool. So I have less than 8 threads really working. If this hierarchy is deep, I maybe have nobody working and wait forever.

1) I assume future.get() will not return the thread to the thread pool, right? So if done like that I will wait in future.get() but no more threads to ever start the work? [I cannot easy test that because I already changed the approach]

Approach 2, the current one, at least somebody working

My current approach (not much better) is to do the final part (partN) of a split with the current thread. If finished I check if partN-1 was already started, if yes I wait for all tasks in future.get() otherwise current thread does partN-1 too, and if needed partN-2... So now I should always have at least one thread of the pool working.

But since the answer to question 1) probably is that future.get() will block my threads, with this approach I will have only few working threads on deep hierarchies.

Approach 3, the only solution I see

I assume I must use 2 thread pools, one for the hard work and one for all the waiting. So I would have a fixed size thread pool for the hard work and (a dynamic?) one for the waiting.

3.a.: But that means that the Split Task must only spawn threads from the waiting pool and the Task doing real work will spawn a new thread from the work pool and wait for it to complete. Ugly, but should work. Ugly because at the moment the whole threading support is all in the Split Task, but with this solution other Tasks doing the hard work must know about threading.

3.b.: Another approach would be that Split spawns worker threads, but inside split each waiting must be done by a waiting thread while the current thread also does worker thread tasks in the meantime. With this, all threading support is in Split Task class, but I'm not sure how to implement that.

2a) How can I wait for the tasks without blocking the current thread?

2b) Can I return the current thread to the worker thread pool, let a waiter thread wait and then after waiting continue with the previous current thread or a thread from the worker pool? How?

Other solutions

Don't use a fixed sized thread pool.

3) Is my idea to have 8 threads wrong? But how many then if the hierarchies can be deep? And isn't there the risk that the JVM starts many tasks in parallel and switches a lot between them?

4) What do I miss or what would you do to solve that problem?

Thanks a lot and regards


[EDIT]

Accepted Solution and why I try something different (based on approach 2)

I accepted the ForkJoinPool as the correct solution.

However, some of the details and possible overhead and loss of control make me want to try another approach. But the more I think about it, the more I come back to using ForkJoinPool (see Note at the end for the reason). Sorry for the amount of text.

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

"However, no such adjustments are guaranteed in the face of blocked IO or other unmanaged synchronization."

"maximum number of running threads to 32767"

http://homes.cs.washington.edu/~djg/teachingMaterials/grossmanSPAC_forkJoinFramework.html

"The documentation for the ForkJoin framework suggests creating parallel subtasks until the number of basic computation steps is somewhere over 100 and less than 10,000."

The 'hard work' Tasks read a lot of data from disk and it is very far from 10,000 basic computations. Actually I could fork/join it down to maybe acceptable levels, but this is too much work now because that part of the code is rather complex.

I think approach 3a is basically an implementation of ForkJoin, except that I would have more control and probably less overhead and the problems just mentioned above should not exist (but no automatic adaption to CPU resources provided by OS, but I will force the OS to give me what I want if I have to).

I maybe try to use approach 2 with some changes: that way I can work with an exact thread number and I don't have any waiting threads, ForkJoinPool seems to work with waiting threads if I understand it correctly.

Current thread does jobs until all jobs in this Split instance are being run by a worker thread (so work stealing in Split node like previously), but then it will not call future.get(), but just check if all futures are ready with future.isDone(). If not all are done, it will steal a job from the thread pool and execute it, then it checks the futures again. That way I will never wait as long as there is a single job that is not running.

The Ugly: if there is no job to steal I would have to sleep for a short time and then check the futures again or steal a new job from the pool (is there a way to wait for multiple Futures to all be complete with a timeout that will not cancel the computations if it triggers?)

So I think I have to use a Completion Service for the ThreadPool in each Split Task, then I can poll using a timeout and do not need to sleep.

Assumption: the ThreadPool in the Completion Service can still be used like a normal ThreadPool (e.g. Job stealing). One ThreadPool can be in many Completion Services.

I think this is the optimal solution for the problem detailed in the question. However, there is a small problem with that, see the following.

Note:

After looking at the 'hard' tasks again, I see that they can be parallelized for many of their instantiations. So adding threading there too is the next logical step. These are always leaf nodes and the work they do is optimally done with a completion service (in some cases sub-jobs can have different runtimes but any 2 results can build a new job). To do them with the ForkJoinPool I would have to use managedBlock() and implement ForkJoinPool.ManagedBlocker, which make the code more complex. However, at the same time the usage of CompletionService in these leave nodes means my approach 2 based solution will probably need waiting threads too, so I maybe better go with ForkJoinPool.

Was it helpful?

Solution 3

I had to move away from ForkJoinPool, it did not use the threads optimally. While it worked fine for the Loop and Split nodes, it did no longer work if I want to parallelize the leaf nodes where the actual work happens. When I add those as RecursiveTask, then most threads are idle. The join() call does not steal work in the leaves for some reason (jdk1.7.0_45). It is waiting. In my case all the work is in the leaves, so using a custom RecursiveTask subclass for the leaves is worse than just using it for Loop and Split nodes (because it is waiting after part of the work while otherwise it waits after all the work). I don't think I use ForkJoinPool wrong, if you google you find people with similar problems.

I now did a simple solution: 2 thread pools, 1 fixed size for the actual hard work, and a cached one for all the Loop and Split nodes. I created FakeRecursiveTask (extend this instead of original) so I did not have to change the code (for Loop and Split). I use HardWork as base class for the leaves just that it is clear that it is something different, just call doHardWork(work).

With this solution all my worker threads are used fully all the time. Since the tree has limited size I should never run out of helper threads. Actually in my case it mostly uses the same number of helper threads as there are worker threads (so 8 in my case).

public class ThreadPool3 {
        private static int maxNumWorkerThreads;
        private static ExecutorService workerPool = null;
        private static ExecutorService helperPool = null;

        public static void initThreadPool(int maxNumWorkerThreads_) {
                int availProcessors = Runtime.getRuntime().availableProcessors();
                if (maxNumWorkerThreads_ <= 0) {
                        maxNumWorkerThreads_ = availProcessors;
                }
                maxNumWorkerThreads = maxNumWorkerThreads_;

                if (availProcessors != maxNumWorkerThreads) {
                        System.out.println("WARN: maxNumWorkerThreads (" + maxNumWorkerThreads + ") != availProcessors (" + availProcessors + ")");
                }
                workerPool = Executors.newFixedThreadPool(maxNumWorkerThreads);
                BlockingQueue<Runnable> workQueue = new SynchronousQueue<Runnable>();
                helperPool = new ThreadPoolExecutor(0, 4 * maxNumWorkerThreads, 60, TimeUnit.MINUTES, workQueue, Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.CallerRunsPolicy());
        }


        public static abstract class HardWork implements Callable<Void> {
                @Override
                public abstract Void call() throws Exception;
        }

        public static void doHardWork(List<HardWork> tasks) throws Exception {
                workerPool.invokeAll(tasks);
        }


        /**
        * fake ForkJoinPoolInterface:
        *
        */
        public static abstract class FakeRecursiveTask<T> implements Callable<T> {
                private Future<T> resultFuture = null;

                /**
                * fake interface:
                */
                public abstract T compute();

                /**
                * fake interface:
                */
                public T invoke() {
                        return compute();
                }

                /**
                * fake interface:
                */
                public void fork() {
                        resultFuture = helperPool.submit(this);
                }

                /**
                * fake interface:
                */
                public T join() {
                        try {
                                return resultFuture.get();
                        }
                        catch (Exception e) {
                                throw new RuntimeException(e);
                        }
                }

                @Override
                public T call() throws Exception {
                        return compute();
                }
        }


        public static void shutdownThreadPool() {
                if (workerPool != null) {
                        workerPool.shutdown();
                }
                if (helperPool != null) {
                        helperPool.shutdown();
                }
        }
}

OTHER TIPS

You seem to have a parallel "divide and conquor" type problem where you are recursively splitting the problem into sub-problems to be "solved" using available cores.

You are correct that a niave implementation that creates threads is likely to use a lot of resources, and using a bounded thread pool will most likely deadlock.

The third alternative is the "fork/join" model implemented in Java 7. This is described in the Oracle Java tutorial (here), but I think that Dan Grossman's lecture notes do a better job of explaining it:

To avoid deadlocks completely, just do not use synchronous Future.get(). Use asynchronous methods CompletableFuture.then and CompletableFuture.both instead, available in Java8. These methods do not block, but submit new tasks when data are available. if you don't want to use Java8, look at Guava library, which (I believe) has equivalent facilities. Other asynchronous libraries exist, e.g. https://github.com/rfqu/df4j of mine. Its advantage is that task objects can be reused, so less number objects has to be created. If you provide more detailed description of your problem (say, in ordinary sequential form, or using infinite number of threads), I can help you to implement your program with df4j.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top