Question

Here qu see my Priority ThreadPoolExecutor - it work good, but problem that it do not create new thread if number of сorePool еhread is achieved.

import java.util.Comparator;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by ngrigoriev on 4/24/14.
 */
public class PriorityExecutor extends ThreadPoolExecutor {

    public PriorityExecutor(int corePoolSize, int maxPoolSize, int quequSize) {
        super(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()));
    }

    public PriorityExecutor(int corePoolSize, int maxPoolSize, long time, TimeUnit unit, int quequSize) {
        super(corePoolSize, maxPoolSize, time, unit, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()));
    }

    public PriorityExecutor() {
        super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(11, new PriorityTaskComparator()));
    }

    public PriorityExecutor(final ThreadFactory threadFactory, int quequSize) {
        super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()), threadFactory);
    }

    public PriorityExecutor(final RejectedExecutionHandler handler, int quequSize) {
        super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()), handler);
    }

    public PriorityExecutor(final ThreadFactory threadFactory, final RejectedExecutionHandler handler, int quequSize) {
        super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<>(quequSize, new PriorityTaskComparator()), threadFactory,
                handler);
    }

    @Override
    public Future<?> submit(final Runnable task) {
        if (task == null)
            throw new NullPointerException();
        final RunnableFuture<Object> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null)
            throw new NullPointerException();
        final RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        final RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
        if (callable instanceof Important)
            return new PriorityTask<>(((Important) callable).getPriority(), callable);
        else
            return new PriorityTask<>(0, callable);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
        if (runnable instanceof Important)
            return new PriorityTask<>(((Important) runnable).getPriority(), runnable, value);
        else
            return new PriorityTask<>(0, runnable, value);
    }

    public interface Important {
        int getPriority();
    }

    private static final class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>> {
        private final int priority;

        public PriorityTask(final int priority, final Callable<T> tCallable) {
            super(tCallable);

            this.priority = priority;
        }

        public PriorityTask(final int priority, final Runnable runnable, final T result) {
            super(runnable, result);

            this.priority = priority;
        }

        @Override
        public int compareTo(final PriorityTask<T> o) {
            final long diff = o.priority - priority;
            return 0 == diff ? 0 : 0 > diff ? -1 : 1;
        }
    }

    private static class PriorityTaskComparator implements Comparator<Runnable> {
        @Override
        public int compare(final Runnable left, final Runnable right) {
            return ((PriorityTask) left).compareTo((PriorityTask) right);
        }
    }
}

During debug i have found that in method execute in line 1368 we gave a condition to create a non core worker, but this condition never true this method(workerCountOf(recheck)) can't be debug without debug byte code

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
       1368 **else if (workerCountOf(recheck) == 0)**
            addWorker(null, false);
    }

It's actually an answer

The pool only creates non-core threads when the queue cannot accept the tasks. The queue always accepts the task so it does not create more threads.

Thanks, but can i change Thread pool policy to add new non core thread by some condition - for example by core thread state Waiting

Was it helpful?

Solution

You will need to provide a custom BlockingQueue and RejectedExecutionHandler implementation which wraps your priority queue. This custom queue will need a reference to the ThreadPoolExecutor and choose based on the current active count of the ThreadPoolExecutor whether or not to accept "offered" Runnable instances. It must also be the rejected execution handler for the ThreadPoolExecutor so that if multiple objects are being offered concurrently, causing the max size to be exceeded, jobs are still added to the priority queue.

The alternative (which is /much/ simpler) is to simply set your "core" size to the max number you wish to allow and allow core threads to time out.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top