Domanda

Sembra impossibile creare un pool di thread nella cache con un limite al numero di thread che può creare.

Ecco come viene implementato Executors.newCachedThreadPool statico nella libreria Java standard:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Quindi, usando quel modello per continuare a creare un pool di thread nella cache di dimensioni fisse:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Ora se lo usi e invii 3 attività, tutto andrà bene. L'invio di ulteriori attività comporterà eccezioni di esecuzione rifiutate.

Prova questo:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

Il risultato sarà l'esecuzione sequenziale di tutti i thread. Ad esempio, il pool di thread non realizzerà mai più di un thread per gestire le attività.

Questo è un bug nel metodo di esecuzione di ThreadPoolExecutor? O forse questo è intenzionale? O c'è un altro modo?

Modifica: voglio qualcosa di simile al pool di thread nella cache (crea thread su richiesta e poi li uccide dopo un certo timeout) ma con un limite al numero di thread che può creare e alla possibilità di continuare a mettere in coda attività aggiuntive una volta raggiunto il limite del thread. Secondo la risposta di Sjlee questo è impossibile. Guardando il metodo execute () di ThreadPoolExecutor è davvero impossibile. Avrei bisogno di sottoclassare ThreadPoolExecutor e sovrascrivere execute () in qualche modo come fa SwingWorker, ma ciò che SwingWorker fa nel suo execute () è un hack completo.

È stato utile?

Soluzione

ThreadPoolExecutor ha i seguenti comportamenti chiave e i tuoi problemi possono essere spiegati da questi comportamenti.

Quando vengono inoltrate attività,

  1. Se il pool di thread non ha raggiunto la dimensione del core, crea nuovi thread.
  2. Se la dimensione del core è stata raggiunta e non ci sono thread inattivi, mette in coda le attività.
  3. Se la dimensione del core è stata raggiunta, non ci sono thread inattivi e la coda si riempie, crea nuovi thread (fino a raggiungere la dimensione massima).
  4. Se è stata raggiunta la dimensione massima, non ci sono thread inattivi e la coda si riempie, entra in vigore la politica di rifiuto.

Nel primo esempio, nota che SynchronousQueue ha essenzialmente una dimensione di 0. Pertanto, nel momento in cui raggiungi la dimensione massima (3), inizia la politica di rifiuto (# 4).

Nel secondo esempio, la coda di scelta è un LinkedBlockingQueue che ha una dimensione illimitata. Pertanto, rimani bloccato con il comportamento # 2.

Non puoi davvero armeggiare molto con il tipo memorizzato nella cache o il tipo fisso, poiché il loro comportamento è quasi completamente determinato.

Se si desidera disporre di un pool di thread limitato e dinamico, è necessario utilizzare una dimensione core positiva e una dimensione massima combinate con una coda di dimensioni finite. Ad esempio,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Addendum : questa è una risposta abbastanza vecchia e sembra che JDK abbia cambiato il suo comportamento quando si tratta di dimensioni del core di 0. Da JDK 1.6, se la dimensione del core è 0 e il pool lo fa non ha thread, ThreadPoolExecutor aggiungerà un thread per eseguire tale attività. Pertanto, la dimensione del nucleo di 0 è un'eccezione alla regola sopra. Grazie Steve per portandolo alla mia attenzione.

Altri suggerimenti

A meno che non mi sia perso qualcosa, la soluzione alla domanda originale è semplice. Il codice seguente implementa il comportamento desiderato come descritto dal poster originale. Genererà fino a 5 thread per lavorare su una coda illimitata e i thread inattivi termineranno dopo 60 secondi.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

Aveva lo stesso problema. Poiché nessun'altra risposta mette insieme tutti i problemi, sto aggiungendo il mio:

Ora è chiaramente scritto in docs : Se si utilizza una coda che non blocca ( LinkedBlockingQueue ) l'impostazione dei thread max non ha alcun effetto, vengono utilizzati solo i thread core.

così:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

Questo esecutore ha:

  1. Nessun concetto di thread max poiché stiamo usando una coda illimitata. Questa è una buona cosa perché tale coda può causare all'esecutore la creazione di un numero enorme di thread extra non core se segue la sua politica abituale.

  2. Una coda di dimensione massima Integer.MAX_VALUE . Submit () genererà RejectedExecutionException se il numero di attività in sospeso supera Integer.MAX_VALUE . Non sono sicuro che prima saremo a corto di memoria o ciò accadrà.

  3. Ha 4 thread core possibili. I thread core inattivi terminano automaticamente se inattivi per 5 secondi. Quindi, sì, thread strettamente su richiesta. Il numero può essere variato usando il metodo setThreads () .

  4. Si assicura che il numero minimo di thread principali non sia mai inferiore a uno, altrimenti submit () rifiuterà ogni attività. Poiché i thread core devono essere > = max thread, il metodo setThreads () imposta anche i thread max, sebbene l'impostazione dei thread max sia inutile per una coda illimitata.

Nel tuo primo esempio, le attività successive vengono rifiutate perché AbortPolicy è il RejectedExecutionHandler predefinito. ThreadPoolExecutor contiene i seguenti criteri, che è possibile modificare tramite il metodo setRejectedExecutionHandler :

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Sembra che tu voglia un pool di thread nella cache con un CallerRunsPolicy.

Nessuna delle risposte qui risolto il mio problema, che riguardava la creazione di un numero limitato di connessioni HTTP utilizzando il client HTTP di Apache (versione 3.x). Dal momento che mi ci sono volute alcune ore per capire una buona installazione, condividerò:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Questo crea un ThreadPoolExecutor che inizia con cinque e contiene un massimo di dieci thread in esecuzione simultaneamente usando CallerRunsPolicy per l'esecuzione.

Per Javadoc per ThreadPoolExecutor:

  

Se sono in esecuzione più di corePoolSize ma meno di maximumPoolSize in esecuzione, verrà creato un nuovo thread solo se la coda è piena . Impostando corePoolSize e maximumPoolSize allo stesso modo, si crea un pool di thread di dimensioni fisse.

(Enfasi mia.)

la risposta di jitter è ciò che vuoi, anche se la mia risponde alla tua altra domanda. :)

c'è un'altra opzione. Invece di usare la nuova SynchronousQueue puoi usare anche qualsiasi altra coda, ma devi assicurarti che la sua dimensione sia 1, in modo da forzare il servizio di esecuzione per creare un nuovo thread.

Non sembra che nessuna delle risposte risponda effettivamente alla domanda - in effetti non riesco a vedere un modo per farlo - anche se esegui la sottoclasse da PooledExecutorService poiché molti dei metodi / proprietà sono privati, ad es. rendendo addIfUnderMaximumPoolSize protetto, è possibile effettuare le seguenti operazioni:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

Il più vicino che ho avuto è stato questo, ma anche questa non è un'ottima soluzione

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

P.S. non testato quanto sopra

Questo è quello che vuoi (almeno credo di sì). Per una spiegazione, controlla Jonathan Feinberg rispondere

Executors.newFixedThreadPool (int n)

  

Crea un pool di thread che riutilizza un numero fisso di thread che operano su una coda non associata condivisa. In ogni momento, al massimo i thread di nThreads saranno attività di elaborazione attive. Se vengono inviate attività aggiuntive quando tutti i thread sono attivi, attenderanno in coda fino a quando non sarà disponibile un thread. Se un thread termina a causa di un errore durante l'esecuzione prima dell'arresto, ne sostituirà uno nuovo, se necessario, per eseguire le attività successive. I thread nel pool esistono fino a quando non vengono esplicitamente arrestati.

Ecco un'altra soluzione. Penso che questa soluzione si comporti come vuoi tu (anche se non orgogliosa di questa soluzione):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
  1. Puoi utilizzare ThreadPoolExecutor come suggerito da @sjlee

    È possibile controllare dinamicamente le dimensioni del pool. Dai un'occhiata a questa domanda per maggiori dettagli:

    Dynamic Thread Pool

    o

  2. Puoi usare newWorkStealingPool API, introdotta con java 8.

    public static ExecutorService newWorkStealingPool()
    
      

    Crea un pool di thread che ruba il lavoro usando tutti i processori disponibili come livello di parallelismo di destinazione.

Per impostazione predefinita, il livello di parallelismo è impostato sul numero di core della CPU nel server. Se si dispone di un server CPU a 4 core, la dimensione del pool di thread sarebbe 4. Questa API restituisce il tipo ForkJoinPool di ExecutorService e consente il lavoro rubando i thread inattivi rubando le attività dai thread occupati in ForkJoinPool.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top