Domanda

Sto cercando di utilizzare un ThreadPoolExecutor per pianificare le attività, ma incorrere in alcuni problemi con le sue politiche. Ecco il suo comportamento dichiarato:

  1. Se meno di discussioni corePoolSize sono in esecuzione, l'Esecutore sempre preferisce l'aggiunta di un nuovo thread, piuttosto che fare la coda.
  2. Se corePoolSize o più thread sono in esecuzione, l'Esecutore preferisce sempre fare la coda di una richiesta piuttosto che aggiungere un nuovo thread.
  3. Se la richiesta non può essere messo in coda, un nuovo thread si crea meno che ciò non superare maximumPoolSize, nel qual caso, l'attività verrà rifiutato.

Il comportamento che voglio è questa:

  1. come sopra
  2. Se più di corePoolSize ma inferiore discussioni maximumPoolSize sono in esecuzione, preferisce l'aggiunta di un nuovo filo sopra accodamento, e utilizzando un thread inattivo sopra aggiungendo un nuovo filo.
  3. come sopra

In fondo non voglio alcuna attività da respingere; Voglio che siano accodati in una coda infinita. Ma io voglio avere fino a discussioni maximumPoolSize. Se uso una coda infinita, non è mai genera discussioni dopo che colpisce coreSize. Se io uso una coda delimitata, respinge compiti. C'è un modo per aggirare questo?

Quello che sto pensando ora è in esecuzione il ThreadPoolExecutor su uno SynchronousQueue, ma non alimentando attività direttamente ad essa - invece li alimenta ad un LinkedBlockingQueue illimitata separata. Poi un altro feed filo da LinkedBlockingQueue nella esecutore, e se uno viene respinto, si cerca semplicemente di nuovo fino a quando non viene rifiutata. Questo mi sembra un dolore e un po 'di un hack, anche se -? C'è un modo più pulito per fare questo

È stato utile?

Soluzione

Il caso d'uso è comune, del tutto legittime e, purtroppo, più difficile di quanto ci si aspetterebbe. Per info sfondo si può leggere questa discussione e trovare un puntatore a una soluzione (menzionato anche nel thread) qui . La soluzione di Shay funziona bene.

In generale mi piacerebbe essere un po 'preoccupati di code illimitate; di solito è meglio avere esplicito controllo di flusso in entrata che si degrada con grazia e regola il rapporto tra la corrente di lavoro / rimanente per non sopraffare sia produttore o consumatore.

Altri suggerimenti

Probabilmente non è necessario micro-gestire il pool di thread ad essere richiesto.

A nella cache pool di thread si ri-utilizzare thread inattivi consentendo anche thread concorrenti potenzialmente illimitato. Questo ovviamente potrebbe portare a instabilità degradanti prestazioni dal cambio di contesto ambientale durante periodi impulsivi.

Executors.newCachedThreadPool();

Una soluzione migliore è quella di porre un limite al numero totale di thread scartando la nozione di assicurare thread inattivi vengono utilizzati prima. Le modifiche di configurazione potrebbe essere:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

Il ragionamento su questo scenario, se l'esecutore ha meno di discussioni corePoolSize, che non deve essere molto occupato. Se il sistema non è molto occupato, quindi c'è poco danno in filatura un nuovo thread. Facendo questo farà sì che il vostro ThreadPoolExecutor per creare sempre un nuovo lavoratore se è sotto il numero massimo di lavoratori consentito. Solo quando il numero massimo di lavoratori sono "esecuzione" saranno i lavoratori in attesa pigramente per le attività sarà dato compiti. Se un lavoratore attende aReasonableTimeDuration senza un compito, allora è permesso di interrompere. Utilizzando limiti ragionevoli per la dimensione del pool (dopo tutto, ci sono solo tante CPU) e una abbastanza grande timeout (per mantenere le discussioni da inutilmente di terminazione), saranno probabilmente visti i benefici desiderati.

L'ultima opzione è hacker. Fondamentalmente, il ThreadPoolExecutor utilizza internamente BlockingQueue.offer per determinare se la coda ha capacità. Un'implementazione personalizzata di BlockingQueue può sempre rifiutare il tentativo offer. Quando il ThreadPoolExecutor non riesce a offer un compito alla coda, si cercherà di fare un nuovo lavoratore. Se un nuovo lavoratore non può essere creato, un RejectedExecutionHandler sarebbe chiamato. A quel punto, un RejectedExecutionHandler personalizzato potrebbe costringere un put nella BlockingQueue personalizzato.

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}

Basta impostare corePoolsize = maximumPoolSize e utilizzare una coda infinita?

Nel vostro elenco di punti, 1 esclude 2, dal momento che corePoolSize sarà sempre minore o uguale di maximumPoolSize.

Modifica

C'è ancora qualcosa di incompatibile tra ciò che si vuole e ciò TPE vi offrirà.

Se si dispone di una coda infinita, maximumPoolSize viene ignorata in modo, come si osserva, non più di discussioni corePoolSize potrà mai essere creato e utilizzato.

Quindi, ancora una volta, se si prende corePoolsize = maximumPoolSize con una coda infinita, si dispone di ciò che si vuole, no?

VUOI essere alla ricerca di qualcosa di più simile a un pool di thread cache?

http : //download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool ()

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top