Pregunta

Parece imposible realizar un grupo de subprocesos en caché con un límite en el número de subprocesos que puede crear.

Aquí es cómo se implementa Executors.newCachedThreadPool estático en la biblioteca estándar de Java:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Entonces, usando esa plantilla para crear un grupo de subprocesos en caché de tamaño fijo:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Ahora, si usa esto y envía 3 tareas, todo estará bien. El envío de cualquier otra tarea dará como resultado excepciones de ejecución rechazadas.

Intentando esto:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

El resultado será que todos los hilos se ejecuten secuencialmente. Es decir, el grupo de subprocesos nunca creará más de un subproceso para manejar sus tareas.

¿Este es un error en el método de ejecución de ThreadPoolExecutor? O tal vez esto es intencional? ¿O hay alguna otra forma?

Editar: quiero algo exactamente como el grupo de subprocesos en caché (crea subprocesos bajo demanda y luego los mata después de un tiempo de espera) pero con un límite en el número de subprocesos que puede crear y la capacidad de continuar haciendo cola en tareas adicionales Una vez que ha alcanzado su límite de hilo. Según la respuesta de sjlee, esto es imposible. Mirando el método execute () de ThreadPoolExecutor, es realmente imposible. Necesitaría subclasificar ThreadPoolExecutor y anular execute () de manera similar a lo que hace SwingWorker, pero lo que SwingWorker hace en execute () es un hack completo.

¿Fue útil?

Solución

El ThreadPoolExecutor tiene los siguientes comportamientos clave y sus problemas pueden explicarse por estos comportamientos.

Cuando se envían tareas,

  1. Si el grupo de subprocesos no ha alcanzado el tamaño del núcleo, crea nuevos subprocesos.
  2. Si se ha alcanzado el tamaño del núcleo y no hay hilos inactivos, pone en cola las tareas.
  3. Si se ha alcanzado el tamaño del núcleo, no hay subprocesos inactivos y la cola se llena, crea nuevos subprocesos (hasta que alcanza el tamaño máximo).
  4. Si se ha alcanzado el tamaño máximo, no hay subprocesos inactivos, y la cola se llena, se activa la política de rechazo.

En el primer ejemplo, tenga en cuenta que SynchronousQueue tiene esencialmente un tamaño de 0. Por lo tanto, en el momento en que alcanza el tamaño máximo (3), se activa la política de rechazo (# 4).

En el segundo ejemplo, la cola elegida es LinkedBlockingQueue que tiene un tamaño ilimitado. Por lo tanto, te quedas atascado con el comportamiento # 2.

No se puede jugar mucho con el tipo en caché o el tipo fijo, ya que su comportamiento está casi completamente determinado.

Si desea tener un grupo de subprocesos limitado y dinámico, debe utilizar un tamaño de núcleo positivo y un tamaño máximo combinados con una cola de un tamaño finito. Por ejemplo,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Addendum : esta es una respuesta bastante antigua, y parece que JDK cambió su comportamiento cuando se trata de un tamaño de núcleo de 0. Desde JDK 1.6, si el tamaño de núcleo es 0 y la agrupación sí no tiene ningún hilo, el ThreadPoolExecutor agregará un hilo para ejecutar esa tarea. Por lo tanto, el tamaño del núcleo de 0 es una excepción a la regla anterior. Gracias Steve por trayendo eso a mi atención.

Otros consejos

A menos que haya pasado algo por alto, la solución a la pregunta original es simple. El siguiente código implementa el comportamiento deseado como lo describe el póster original. Generará hasta 5 subprocesos para trabajar en una cola ilimitada y los subprocesos inactivos terminarán después de 60 segundos.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

Tuve el mismo problema. Como ninguna otra respuesta pone todos los problemas juntos, estoy agregando el mío:

Ahora está claramente escrito en docs : If utiliza una cola que no bloquea ( LinkedBlockingQueue ) la configuración de subprocesos máximos no tiene efecto, solo se usan subprocesos centrales.

entonces:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

Este ejecutor tiene:

  1. No hay concepto de subprocesos máximos ya que estamos usando una cola ilimitada. Esto es algo bueno porque dicha cola puede hacer que el ejecutor cree una cantidad masiva de subprocesos adicionales no centrales si sigue su política habitual.

  2. Una cola de tamaño máximo Integer.MAX_VALUE . Submit () lanzará RejectedExecutionException si el número de tareas pendientes supera a Integer.MAX_VALUE . No estoy seguro de que primero se nos acabe la memoria o esto sucederá.

  3. Tiene 4 hilos de núcleo posibles. Los subprocesos centrales inactivos se cierran automáticamente si están inactivos durante 5 segundos. Entonces, sí, estrictamente los subprocesos a pedido. El número puede modificarse utilizando el método setThreads () .

  4. Se asegura de que el número mínimo de subprocesos principales nunca sea inferior a uno, de lo contrario submit () rechazará todas las tareas. Como los subprocesos centrales deben ser > = subprocesos máximos, el método setThreads () establece también los subprocesos máximos, aunque la configuración de subprocesos máximos no es útil para una cola ilimitada.

En su primer ejemplo, las tareas posteriores se rechazan porque el AbortPolicy es el predeterminado RejectedExecutionHandler . ThreadPoolExecutor contiene las siguientes políticas, que puede cambiar a través del método setRejectedExecutionHandler :

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Parece que quiere un grupo de subprocesos en caché con CallerRunsPolicy.

Ninguna de las respuestas aquí solucionó mi problema, que tenía que ver con crear una cantidad limitada de conexiones HTTP utilizando el cliente HTTP de Apache (versión 3.x). Como me llevó algunas horas encontrar una buena configuración, compartiré:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Esto crea un ThreadPoolExecutor que comienza con cinco y contiene un máximo de diez subprocesos que se ejecutan simultáneamente utilizando CallerRunsPolicy para ejecutar.

Según el Javadoc para ThreadPoolExecutor:

  

Si hay más de corePoolSize pero menos de maximumPoolSize subprocesos en ejecución, se creará un nuevo subproceso solo si la cola está llena . Al configurar corePoolSize y maximumPoolSize de la misma manera, crea un grupo de subprocesos de tamaño fijo.

(Énfasis mío)

La respuesta de

jitter es lo que quieres, aunque la mía responde a tu otra pregunta. :)

hay una opción más. En lugar de usar el nuevo SynchronousQueue, también puede usar cualquier otra cola, pero debe asegurarse de que su tamaño sea 1, de modo que obligue al servicio del ejecutor a crear un nuevo hilo.

No parece que ninguna de las respuestas responda realmente a la pregunta, de hecho, no puedo ver una forma de hacerlo, incluso si subclasifica desde PooledExecutorService ya que muchos de los métodos / propiedades son privados, p. haciendo que addIfUnderMaximumPoolSize esté protegido, puede hacer lo siguiente:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

Lo más cercano que obtuve fue esto, pero incluso eso no es una muy buena solución

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

p.s. no probado lo anterior

Esto es lo que quieres (al menos eso creo). Para obtener una explicación, consulte Jonathan Feinberg responder

Executors.newFixedThreadPool (int n)

  

Crea un grupo de subprocesos que reutiliza un número fijo de subprocesos que funcionan desde una cola compartida no acotada. En cualquier momento, como máximo, las hebras nThreads serán tareas de procesamiento activas. Si se envían tareas adicionales cuando todos los subprocesos están activos, esperarán en la cola hasta que haya un subproceso disponible. Si algún subproceso termina debido a un error durante la ejecución antes del cierre, uno nuevo tomará su lugar si es necesario para ejecutar tareas posteriores. Los subprocesos de la agrupación existirán hasta que se cierre explícitamente.

Aquí hay otra solución. Creo que esta solución se comporta como usted desea (aunque no está orgullosa de esta solución):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
  1. Puede usar ThreadPoolExecutor como lo sugiere @sjlee

    Puede controlar el tamaño del grupo dinámicamente. Eche un vistazo a esta pregunta para obtener más detalles:

    Grupo dinámico de subprocesos

    OR

  2. Puede usar newWorkStealingPool API, que se ha introducido con java 8.

    public static ExecutorService newWorkStealingPool()
    
      

    Crea un grupo de subprocesos de robo de trabajo utilizando todos los procesadores disponibles como su nivel de paralelismo objetivo.

De manera predeterminada, el nivel de paralelismo se establece en el número de núcleos de CPU en su servidor. Si tiene un servidor CPU de 4 núcleos, el tamaño del grupo de subprocesos sería 4. Esta API devuelve el tipo de ForkJoinPool de ExecutorService y permite el robo de trabajo de subprocesos inactivos robando tareas de subprocesos ocupados en ForkJoinPool.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top