Here qu see my Priority ThreadPoolExecutor - it work good, but problem that it do not create new thread if number of сorePool еhread is achieved.
import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by ngrigoriev on 4/24/14.
*/
public class PriorityExecutor extends ThreadPoolExecutor {
public PriorityExecutor(int corePoolSize, int maxPoolSize, int quequSize) {
super(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()));
}
public PriorityExecutor(int corePoolSize, int maxPoolSize, long time, TimeUnit unit, int quequSize) {
super(corePoolSize, maxPoolSize, time, unit, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()));
}
public PriorityExecutor() {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(11, new PriorityTaskComparator()));
}
public PriorityExecutor(final ThreadFactory threadFactory, int quequSize) {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()), threadFactory);
}
public PriorityExecutor(final RejectedExecutionHandler handler, int quequSize) {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()), handler);
}
public PriorityExecutor(final ThreadFactory threadFactory, final RejectedExecutionHandler handler, int quequSize) {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()), threadFactory,
handler);
}
@Override
public Future<?> submit(final Runnable task) {
if (task == null)
throw new NullPointerException();
final RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
if (task == null)
throw new NullPointerException();
final RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
final RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
@Override
protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
if (callable instanceof Important)
return new PriorityTask<>(((Important) callable).getPriority(), callable);
else
return new PriorityTask<>(0, callable);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
if (runnable instanceof Important)
return new PriorityTask<>(((Important) runnable).getPriority(), runnable, value);
else
return new PriorityTask<>(0, runnable, value);
}
public interface Important {
int getPriority();
}
private static final class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>> {
private final int priority;
public PriorityTask(final int priority, final Callable<T> tCallable) {
super(tCallable);
this.priority = priority;
}
public PriorityTask(final int priority, final Runnable runnable, final T result) {
super(runnable, result);
this.priority = priority;
}
@Override
public int compareTo(final PriorityTask<T> o) {
final long diff = o.priority - priority;
return 0 == diff ? 0 : 0 > diff ? -1 : 1;
}
}
private static class PriorityTaskComparator implements Comparator<Runnable> {
@Override
public int compare(final Runnable left, final Runnable right) {
return ((PriorityTask) left).compareTo((PriorityTask) right);
}
}
}
During debug i have found that in method execute
in line 1368 we gave a condition to create a non core worker, but this condition never true this method(workerCountOf(recheck)
) can't be debug without debug byte code
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
1368 **else if (workerCountOf(recheck) == 0)**
addWorker(null, false);
}
It's actually an answer
The pool only creates non-core threads when the queue cannot accept
the tasks. The queue always accepts the task so it does not create
more threads.
Thanks, but can i change Thread pool policy to add new non core thread by some condition - for example by core thread state Waiting