Question

Je suis en train d'utiliser un ThreadPoolExecutor pour programmer des tâches, mais en cours d'exécution dans quelques problèmes avec ses politiques. Voici son comportement déclaré:

  1. Si moins de threads corePoolSize fonctionnent, la Exécuteur préfère toujours l'ajout d'un nouveau fil de discussion plutôt que de faire la queue.
  2. Si corePoolSize ou plusieurs threads sont en cours d'exécution, l'Exécuteur préfère toujours faire la queue une demande plutôt que d'ajouter un nouveau thread.
  3. Si une demande ne peut pas être mis en attente, un nouveau thread est créé à moins que cela dépasserait maximumPoolSize, auquel cas, la tâche sera rejetée.

Le comportement que je veux est la suivante:

  1. comme ci-dessus
  2. Si plus de corePoolSize, mais moins que les fils sont en cours d'exécution maximumPoolSize, préfère ajouter un nouveau fil sur les files d'attente, et en utilisant un thread inactif sur l'ajout d'un nouveau thread.
  3. comme ci-dessus

En fait, je ne veux pas de tâches à rejeter; Je veux qu'ils soient mis en attente dans une file d'attente sans bornes. Mais je ne veux avoir jusqu'à maximumPoolSize fils. Si j'utilise une file d'attente sans bornes, il ne génère jamais des fils après qu'il frappe coreSize. Si j'utilise une file d'attente limitée, il rejette les tâches. Y at-il moyen de contourner cela?

Qu'est-ce que je pense à l'est maintenant en cours d'exécution sur un ThreadPoolExecutor SynchronousQueue, mais pas l'alimentation des tâches directement à lui - au lieu de les nourrir à un LinkedBlockingQueue sans bornes séparées. Puis un autre flux de fil de la LinkedBlockingQueue dans l'Exécuteur, et si l'on obtient rejeté, il essaie simplement de nouveau jusqu'à ce qu'il ne soit pas rejetée. Cela semble comme une douleur et un peu un hack, mais - est-il un moyen de plus propre à faire

Était-ce utile?

La solution

Votre cas d'utilisation est commune, tout à fait légitime et malheureusement plus difficile que l'on pourrait attendre. Pour info d'arrière-plan, vous pouvez lire cette discussion et trouver un pointeur vers une solution (également mentionnée dans le fil) . La solution de Shay fonctionne très bien.

En général, je serais un peu méfiant des files d'attente sans bornes; il est généralement préférable d'avoir un contrôle explicite de flux entrant qui se dégrade avec élégance et régule le rapport des travaux en cours / restant de ne pas submerger soit producteur ou consommateur.

Autres conseils

Il est sans doute pas nécessaire de micro-gérer le pool de threads comme demandé.

Un pool de threads mis en mémoire cache réutilisera threads inactifs tout en permettant également des threads concurrents potentiellement illimités. Bien sûr, cela pourrait conduire à une dégradation des performances d'emballement du changement de contexte en tête pendant les périodes bursty.

Executors.newCachedThreadPool();

Une meilleure option consiste à placer une limite sur le nombre total de fils tout en rejetant la notion d'assurer threads inactifs sont utilisés en premier. Les modifications de configuration seraient:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

Raisonnement sur ce scénario, si l'exécuteur a moins de fils de corePoolSize, qu'il ne doit pas être très occupé. Si le système est très occupé, alors il y a peu de mal à filer un nouveau fil. Faire cela entraînera votre ThreadPoolExecutor toujours créer un nouveau travailleur si elle est sous le nombre maximum de travailleurs autorisés. Seulement lorsque le nombre maximum de travailleurs sont « en cours d'exécution » seront les travailleurs attendent sans rien faire pour des tâches être donné des tâches. Si un travailleur attend aReasonableTimeDuration sans tâche, il est permis de mettre fin. En utilisant des limites raisonnables pour la taille de la piscine (après tout, il n'y a que tant de processeurs) et un délai d'attente assez grande (pour garder des fils de mettre fin à inutilement), les avantages escomptés seront probablement vus.

La dernière option est hackish. Fondamentalement, le ThreadPoolExecutor utilise en interne BlockingQueue.offer pour déterminer si la file d'attente a une capacité. Une implémentation personnalisée de BlockingQueue pourrait toujours rejeter la tentative de offer. Lorsque le ThreadPoolExecutor ne parvient pas à offer une tâche à la file d'attente, il va essayer de faire un nouveau travailleur. Si un nouveau travailleur ne peut pas être créé, un RejectedExecutionHandler serait appelé. À ce moment-là, un RejectedExecutionHandler personnalisé pourrait forcer un put dans le BlockingQueue personnalisé.

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}

Il suffit de définir corePoolsize = maximumPoolSize et utiliser une file d'attente sans bornes?

Dans votre liste de points, 1 exclut 2, puisque corePoolSize sera toujours inférieure ou égale à maximumPoolSize.

Modifier

Il y a encore quelque chose incompatible entre ce que vous voulez et ce que TPE vous offrir.

Si vous avez une file d'attente sans bornes, maximumPoolSize est ignoré si, comme vous avez observé, pas plus que les fils de corePoolSize sera jamais créé et utilisé.

Alors, encore une fois, si vous prenez corePoolsize = maximumPoolSize avec une file d'attente sans bornes, vous avez ce que vous voulez, non?

Seriez-vous à la recherche de quelque chose de plus comme un pool de threads en cache?

http : //download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool ()

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top