Pregunta

¿Hay alguna forma de crear un Ejecutor que siempre tenga al menos 5 hilos, y un máximo de 20 hilos, y una cola ilimitada para tareas (lo que significa que no se rechaza ninguna tarea)

Probé un nuevo ThreadPoolExecutor (5, 20, 60L, TimeUnit.SECONDS, queue) con todas las posibilidades que pensé para hacer cola:

new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected

y ninguno funcionó como se quería.

¿Fue útil?

Solución

¿Quizás algo como esto funcionaría para ti? Lo acabo de agitar así que por favor tócalo. Básicamente, implementa un grupo de subprocesos de desbordamiento que se utiliza para alimentar el ThreadPoolExecutor

subyacente

Hay dos inconvenientes principales que veo con él:

  • La falta de un objeto Future devuelto en submit () . Pero tal vez eso no sea un problema para usted.
  • La cola secundaria solo se vaciará en ThreadPoolExecutor cuando se envíen trabajos. Tiene que haber una solución elegante, pero todavía no la veo. Si sabe que habrá una secuencia constante de tareas en el StusMagicExecutor , entonces esto puede no ser un problema. ("Mayo" es la palabra clave). Una opción podría ser que sus tareas enviadas toquen el StusMagicExecutor después de que se completen?

El ejecutor mágico de Stu:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}

Otros consejos

Los javadocs para ThreadPoolExecutor son bastante claros que una vez que se han creado los hilos corePoolSize , los nuevos hilos solo se crearán una vez que la cola esté llena. Entonces, si configura core en 5 y max en 20, nunca obtendrá el comportamiento deseado.

Sin embargo, si establece core y max en 20, las tareas solo se agregarán a la cola si los 20 hilos están ocupados. Por supuesto, esto hace que sus "5 hilos como mínimo" requisito un poco sin sentido, ya que los 20 se mantendrán vivos (hasta que estén inactivos, de todos modos).

Creo que este problema es una deficiencia de la clase y muy engañoso dadas las combinaciones de parámetros del constructor. Aquí hay una solución tomada del ThreadPoolExecutor interno de SwingWorker que hice en una clase de nivel superior. No tiene un mínimo, pero al menos usa un límite superior. Lo único que no sé es qué impacto de rendimiento obtienes de la ejecución de bloqueo.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
    private final ReentrantLock pauseLock = new ReentrantLock();
    private final Condition unpaused = pauseLock.newCondition();
    private boolean isPaused = false;
    private final ReentrantLock executeLock = new ReentrantLock();

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        executeLock.lock();
        try {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
            setCorePoolSize(getMaximumPoolSize());
            super.execute(command);
            setCorePoolSize(0);
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        } finally {
            executeLock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException ignore) {

        } finally {
            pauseLock.unlock();
        }
    }
}
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top