Domanda

Esiste un modo per creare Executor che avrà sempre almeno 5 thread e un massimo di 20 thread e una coda illimitata per le attività (il che significa che nessuna attività viene rifiutata)

Ho provato il nuovo ThreadPoolExecutor (5, 20, 60L, TimeUnit.SECONDS, coda) con tutte le possibilità a cui ho pensato per la coda:

new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected

e nessuno ha funzionato come desiderato.

È stato utile?

Soluzione

Forse qualcosa del genere funzionerebbe per te? L'ho appena montato, quindi per favore cercalo. Fondamentalmente, implementa un pool di thread di overflow che viene utilizzato per alimentare il ThreadPoolExecutor

sottostante

Ci sono due principali svantaggi che vedo con esso:

  • La mancanza di un oggetto Future restituito su submit () . Ma forse non è un problema per te.
  • La coda secondaria verrà svuotata nel ThreadPoolExecutor solo quando i lavori vengono inviati. Deve esserci una soluzione elegante, ma non la vedo ancora. Se sai che ci sarà un flusso costante di attività nel StusMagicExecutor , questo potrebbe non essere un problema. ("Può essere" la parola chiave.) Un'opzione potrebbe essere quella di fare in modo che le tue attività inviate colpiscano il StusMagicExecutor dopo che sono state completate?

Stu's Magic Executor:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}

Altri suggerimenti

I javadocs per ThreadPoolExecutor sono abbastanza chiari che una volta creati i thread corePoolSize , i nuovi thread verranno creati solo quando la coda sarà piena. Quindi se imposti core su 5 e max su 20, non otterrai mai il comportamento desiderato.

Tuttavia, se imposti core e max su 20, le attività verranno aggiunte alla coda solo se tutti e 20 i thread sono occupati. Naturalmente, questo rende il tuo "minimo 5 thread" minimo " requisito un po 'insignificante, dal momento che tutti e 20 saranno mantenuti in vita (fino a quando non saranno inattivi, comunque).

Penso che questo problema sia un difetto della classe e molto fuorviante date le combinazioni di parametri del costruttore. Ecco una soluzione presa dal ThreadPoolExecutor interno di SwingWorker che ho trasformato in una classe di alto livello. Non ha un minimo ma almeno utilizza un limite superiore. L'unica cosa che non so è quale impatto sulle prestazioni ottieni dall'esecuzione del blocco.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor {
    private final ReentrantLock pauseLock = new ReentrantLock();
    private final Condition unpaused = pauseLock.newCondition();
    private boolean isPaused = false;
    private final ReentrantLock executeLock = new ReentrantLock();

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                handler);
    }

    public BoundedThreadPoolExecutor(int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(0, maximumPoolSize, keepAliveTime, unit, workQueue,
                threadFactory, handler);
    }

    @Override
    public void execute(Runnable command) {
        executeLock.lock();
        try {
            pauseLock.lock();
            try {
                isPaused = true;
            } finally {
                pauseLock.unlock();
            }
            setCorePoolSize(getMaximumPoolSize());
            super.execute(command);
            setCorePoolSize(0);
            pauseLock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                pauseLock.unlock();
            }
        } finally {
            executeLock.unlock();
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException ignore) {

        } finally {
            pauseLock.unlock();
        }
    }
}
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top