Вопрос

Есть ли какой-либо способ создать исполнителя, у которого всегда будет не менее 5 потоков и не более 20 потоков и неограниченная очередь для задач (это означает, что ни одна задача не отклоняется)

Я попробовал что-то новое ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) со всеми возможностями, которые я придумал для очереди:

new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected

и ни один из них не сработал так, как хотелось.

Это было полезно?

Решение

Может быть, что-то подобное сработало бы для вас?Я только что приготовила его на скорую руку, так что, пожалуйста, потрогайте.По сути, он реализует пул потоков переполнения, который используется для подачи базового ThreadPoolExecutor

Я вижу в этом два основных недостатка:

  • Отсутствие возвращенного объекта Future на submit().Но, возможно, для вас это не проблема.
  • Вторичная очередь опустеет только в ThreadPoolExecutor когда будут отправлены задания.Должно быть элегантное решение, но я пока его не вижу.Если вы знаете, что в систему будет поступать постоянный поток задач StusMagicExecutor тогда, возможно, это не проблема.(Ключевое слово "Может".) Одним из вариантов может быть, чтобы отправленные вами задания указывали на StusMagicExecutor после того, как они завершатся?

Волшебный Исполнитель Стью:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}

Другие советы

В javadoc для ThreadPoolExecutor достаточно ясно, что после создания потоков corePoolSize новые потоки будут создаваться только после заполнения очереди. Так что если вы установите core на 5 и max на 20, вы никогда не получите желаемого поведения.

Однако, если вы установите для core и max значение 20, задачи будут добавляться в очередь только в том случае, если все 20 потоков заняты. Конечно, это делает ваши "5 потоков минимум" требование немного бессмысленно, так как все 20 останутся в живых (во всяком случае, до тех пор, пока они не остановятся).

Я думаю, что эта проблема является недостатком класса и вводит в заблуждение, учитывая комбинации параметров конструктора. Вот решение, взятое из внутреннего ThreadPoolExecutor SwingWorker, которое я превратил в класс верхнего уровня. У него нет минимума, но, по крайней мере, используется верхняя граница. Единственное, чего я не знаю, так это того, какой удар по производительности вы получаете при выполнении блокировки.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
    private final ReentrantLock pauseLock = new ReentrantLock();
    private final Condition unpaused = pauseLock.newCondition();
    private boolean isPaused = false;
    private final ReentrantLock executeLock = new ReentrantLock();

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        executeLock.lock();
        try {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
            setCorePoolSize(getMaximumPoolSize());
            super.execute(command);
            setCorePoolSize(0);
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        } finally {
            executeLock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException ignore) {

        } finally {
            pauseLock.unlock();
        }
    }
}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top