la politica ThreadPoolExecutor
-
26-09-2019 - |
Domanda
Sto cercando di utilizzare un ThreadPoolExecutor per pianificare le attività, ma incorrere in alcuni problemi con le sue politiche. Ecco il suo comportamento dichiarato:
- Se meno di discussioni corePoolSize sono in esecuzione, l'Esecutore sempre preferisce l'aggiunta di un nuovo thread, piuttosto che fare la coda.
- Se corePoolSize o più thread sono in esecuzione, l'Esecutore preferisce sempre fare la coda di una richiesta piuttosto che aggiungere un nuovo thread.
- Se la richiesta non può essere messo in coda, un nuovo thread si crea meno che ciò non superare maximumPoolSize, nel qual caso, l'attività verrà rifiutato.
Il comportamento che voglio è questa:
- come sopra
- Se più di corePoolSize ma inferiore discussioni maximumPoolSize sono in esecuzione, preferisce l'aggiunta di un nuovo filo sopra accodamento, e utilizzando un thread inattivo sopra aggiungendo un nuovo filo.
- come sopra
In fondo non voglio alcuna attività da respingere; Voglio che siano accodati in una coda infinita. Ma io voglio avere fino a discussioni maximumPoolSize. Se uso una coda infinita, non è mai genera discussioni dopo che colpisce coreSize. Se io uso una coda delimitata, respinge compiti. C'è un modo per aggirare questo?
Quello che sto pensando ora è in esecuzione il ThreadPoolExecutor su uno SynchronousQueue, ma non alimentando attività direttamente ad essa - invece li alimenta ad un LinkedBlockingQueue illimitata separata. Poi un altro feed filo da LinkedBlockingQueue nella esecutore, e se uno viene respinto, si cerca semplicemente di nuovo fino a quando non viene rifiutata. Questo mi sembra un dolore e un po 'di un hack, anche se -? C'è un modo più pulito per fare questo
Soluzione
Il caso d'uso è comune, del tutto legittime e, purtroppo, più difficile di quanto ci si aspetterebbe. Per info sfondo si può leggere questa discussione e trovare un puntatore a una soluzione (menzionato anche nel thread) qui . La soluzione di Shay funziona bene.
In generale mi piacerebbe essere un po 'preoccupati di code illimitate; di solito è meglio avere esplicito controllo di flusso in entrata che si degrada con grazia e regola il rapporto tra la corrente di lavoro / rimanente per non sopraffare sia produttore o consumatore.
Altri suggerimenti
Probabilmente non è necessario micro-gestire il pool di thread ad essere richiesto.
A nella cache pool di thread si ri-utilizzare thread inattivi consentendo anche thread concorrenti potenzialmente illimitato. Questo ovviamente potrebbe portare a instabilità degradanti prestazioni dal cambio di contesto ambientale durante periodi impulsivi.
Executors.newCachedThreadPool();
Una soluzione migliore è quella di porre un limite al numero totale di thread scartando la nozione di assicurare thread inattivi vengono utilizzati prima. Le modifiche di configurazione potrebbe essere:
corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);
Il ragionamento su questo scenario, se l'esecutore ha meno di discussioni corePoolSize
, che non deve essere molto occupato. Se il sistema non è molto occupato, quindi c'è poco danno in filatura un nuovo thread. Facendo questo farà sì che il vostro ThreadPoolExecutor
per creare sempre un nuovo lavoratore se è sotto il numero massimo di lavoratori consentito. Solo quando il numero massimo di lavoratori sono "esecuzione" saranno i lavoratori in attesa pigramente per le attività sarà dato compiti. Se un lavoratore attende aReasonableTimeDuration
senza un compito, allora è permesso di interrompere. Utilizzando limiti ragionevoli per la dimensione del pool (dopo tutto, ci sono solo tante CPU) e una abbastanza grande timeout (per mantenere le discussioni da inutilmente di terminazione), saranno probabilmente visti i benefici desiderati.
L'ultima opzione è hacker. Fondamentalmente, il ThreadPoolExecutor
utilizza internamente BlockingQueue.offer
per determinare se la coda ha capacità. Un'implementazione personalizzata di BlockingQueue
può sempre rifiutare il tentativo offer
. Quando il ThreadPoolExecutor
non riesce a offer
un compito alla coda, si cercherà di fare un nuovo lavoratore. Se un nuovo lavoratore non può essere creato, un RejectedExecutionHandler
sarebbe chiamato. A quel punto, un RejectedExecutionHandler
personalizzato potrebbe costringere un put
nella BlockingQueue
personalizzato.
/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
BlockingQueue<T> delegate;
public boolean offer(T item) {
return false;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
delegate.put(r);
}
//.... delegate methods
}
Basta impostare corePoolsize = maximumPoolSize
e utilizzare una coda infinita?
Nel vostro elenco di punti, 1 esclude 2, dal momento che corePoolSize
sarà sempre minore o uguale di maximumPoolSize
.
Modifica
C'è ancora qualcosa di incompatibile tra ciò che si vuole e ciò TPE vi offrirà.
Se si dispone di una coda infinita, maximumPoolSize
viene ignorata in modo, come si osserva, non più di discussioni corePoolSize
potrà mai essere creato e utilizzato.
Quindi, ancora una volta, se si prende corePoolsize = maximumPoolSize
con una coda infinita, si dispone di ciò che si vuole, no?
VUOI essere alla ricerca di qualcosa di più simile a un pool di thread cache?