specifying ThreadPoolExecutor problem
-
07-07-2019 - |
Question
Is there any way to create Executor that will have always at least 5 threads, and maximum of 20 threads, and unbounded queue for tasks (meaning no task is rejected)
I tried new ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue)
with all possibilities that I thought of for queue:
new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected
and none worked as wanted.
Solution
Maybe something like this would work for you? I just whipped it up so please poke at it. Basically, it implements an overflow thread pool that is used to feed the underlying ThreadPoolExecutor
There are two major draw backs I see with it:
- The lack of a returned Future object on
submit()
. But maybe that is not an issue for you. - The secondary queue will only empty into the
ThreadPoolExecutor
when jobs are submitted. There has got to be an elegant solution, but I don't see it just yet. If you know that there will be a stead stream of tasks into theStusMagicExecutor
then this may not be an issue. ("May" being the key word.) An option might to be to have your submitted tasks poke at theStusMagicExecutor
after they complete?
Stu's Magic Executor:
public class StusMagicExecutor extends ThreadPoolExecutor {
private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>(); //capacity is Integer.MAX_VALUE.
public StusMagicExecutor() {
super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());
}
public void queueRejectedTask(Runnable task) {
try {
secondaryQueue.put(task);
} catch (InterruptedException e) {
// do something
}
}
public Future submit(Runnable newTask) {
//drain secondary queue as rejection handler populates it
Collection<Runnable> tasks = new ArrayList<Runnable>();
secondaryQueue.drainTo(tasks);
tasks.add(newTask);
for (Runnable task : tasks)
super.submit(task);
return null; //does not return a future!
}
}
class RejectionHandler implements RejectedExecutionHandler {
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
((StusMagicExecutor)executor).queueRejectedTask(runnable);
}
}
OTHER TIPS
The javadocs for ThreadPoolExecutor
are pretty clear that once corePoolSize
threads have been created, new threads will only be created once the queue is full. So if you set core
to 5 and max
to 20, you'll never get your desired behaviour.
However, if you set both core
and max
to 20, then tasks will only get added to the queue if all 20 threads are busy. Of course, this renders your "5 threads minimum" requirement a bit meaningless, since all 20 will be kept alive (until they idle out, anyway).
I think this problem is a shortcoming of the class and very misleading given the constructor parameter combinations. Here's a solution taken from SwingWorker's inner ThreadPoolExecutor that I made into a top level class. It doesn't have a minimum but does at least use an upper bound. The only thing I don't know is what performance hit you get from the locking execute.
public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition unpaused = pauseLock.newCondition();
private boolean isPaused = false;
private final ReentrantLock executeLock = new ReentrantLock();
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
}
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
handler);
}
public BoundedThreadPoolExecutor(int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
}
@Override
public void execute(Runnable command) {
executeLock.lock();
try {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
setCorePoolSize(getMaximumPoolSize());
super.execute(command);
setCorePoolSize(0);
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
} finally {
executeLock.unlock();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
pauseLock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException ignore) {
} finally {
pauseLock.unlock();
}
}
}