política ThreadPoolExecutor
-
26-09-2019 - |
Pregunta
Estoy intentando utilizar un ThreadPoolExecutor a programar tareas, pero el funcionamiento en algunos problemas con sus políticas. Aquí está su comportamiento declarado:
- Si menos de hilos corePoolSize se están ejecutando, el Ejecutor siempre prefiere la adición de un nuevo hilo en lugar de hacer cola.
- Si corePoolSize o más hilos están ejecutando, el Ejecutor siempre prefiere hacer cola una petición en lugar de añadir un nuevo hilo.
- Si la solicitud no puede ponerse en cola, se crea un nuevo hilo siempre que ello no exceda maximumPoolSize, en cuyo caso, se rechazará la tarea.
El comportamiento que quiero es la siguiente:
- mismo que el anterior
- Si más de corePoolSize pero menos de hilos maximumPoolSize se están ejecutando, prefiere la adición de un nuevo hilo sobre la puesta en cola, y el uso de un subproceso inactivo sobre la adición de un nuevo hilo.
- mismo que el anterior
Básicamente no quiero ninguna tarea a ser rechazados; Quiero que se pongan en cola en una cola sin límites. Pero yo quiero tener hasta maximumPoolSize hilos. Si utilizo una cola sin límites, nunca genera hilos después de haber caído coreSize. Si utilizo una cola limitada, rechaza tareas. ¿Hay alguna forma de evitar esto?
Lo que estoy pensando ahora se está ejecutando el ThreadPoolExecutor en un SynchronousQueue, pero no la alimentación de tareas directamente a él - en lugar de alimentar a un LinkedBlockingQueue sin límites por separado. Luego otros alimentos hilo desde el LinkedBlockingQueue en el ejecutor, y si uno es rechazado, sino que simplemente lo intenta de nuevo hasta que no se rechaza. Este parece ser un dolor y un poco de corte, sin embargo -? ¿Existe una forma más limpia de hacer esto
Solución
Su caso de uso es común, completamente de fiar y por desgracia más difícil de lo que uno esperaría. Para información de fondo se puede leer esta discusión y encontrar un puntero a una solución (también mencionado en el hilo) aquí . La solución de Shay funciona bien.
En general, me gustaría ser un poco preocupado de colas sin límites; por lo general es mejor tener el control de flujo de entrada explícita de que se degrada con gracia y regula la relación de trabajo actual / restante para no abrumar ya sea productor o consumidor.
Otros consejos
Es probable que no es necesario micro-gestión de la agrupación de hebras que se solicita.
Un grupo de subprocesos en caché se volverá a utilizar subprocesos inactivos mientras que también permite hilos concurrentes potencialmente ilimitadas. Por supuesto, esto podría conducir a disminuir el rendimiento fuera de control de cambio de contexto por encima durante períodos de ráfagas.
Executors.newCachedThreadPool();
Una mejor opción es colocar un límite en el número total de hilos, pero liberada por la idea de asegurar hebras en espera se utilizó por primera vez. Los cambios de configuración serían:
corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);
Razonando sobre este escenario, si el ejecutor tiene menos de hilos corePoolSize
, de lo que no debe estar muy ocupado. Si el sistema no está muy ocupado, entonces hay poco daño en el hilado hasta un nuevo hilo. Hacer esto hará que su ThreadPoolExecutor
para crear siempre un nuevo trabajador si es bajo el número máximo permitido de los trabajadores. Sólo cuando el número máximo de trabajadores están "en marcha" de brazos cruzados esperando trabajadores para tareas darse tareas. Si un trabajador espera aReasonableTimeDuration
sin una tarea, a continuación, se le permite dar por terminado. El uso de límites razonables para el tamaño del grupo (después de todo, sólo hay tantas CPU) y un tiempo de espera razonablemente grande (para mantener los hilos de terminación innecesariamente), es probable que se ven los beneficios deseados.
La última opción es hacker. Básicamente, el ThreadPoolExecutor
utiliza internamente BlockingQueue.offer
para determinar si la cola tiene la capacidad. Una implementación personalizada de BlockingQueue
siempre podía rechazar el intento de offer
. Cuando el ThreadPoolExecutor
no offer
una tarea a la cola, que tratará de hacer un nuevo trabajador. Si un nuevo trabajador no puede ser creado, un RejectedExecutionHandler
sería llamado. En ese momento, un RejectedExecutionHandler
personalizada podría forzar una put
en el BlockingQueue
personalizado.
/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
BlockingQueue<T> delegate;
public boolean offer(T item) {
return false;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
delegate.put(r);
}
//.... delegate methods
}
El sistema justo corePoolsize = maximumPoolSize
y utilizar una cola sin límites?
En la lista de puntos, 1 excluye 2, ya que corePoolSize
será siempre menor o igual que maximumPoolSize
.
Editar
Hay todavía algo incompatibles entre lo que quiere y lo que van a ofrecer TPE.
Si usted tiene una cola sin límites, maximumPoolSize
se ignora así, como has observado, no más de hilos corePoolSize
será jamás creados y utilizados.
Así que, de nuevo, si se toma corePoolsize = maximumPoolSize
con una cola sin límites, usted tiene lo que quiere, ¿no?
¿Quieres que estar buscando algo más parecido a un grupo de subprocesos en caché?