Pergunta

Parece ser impossível fazer um pool de threads em cache com um limite para o número de threads que ele pode criar.

Aqui está como Executors.newCachedThreadPool estática é implementado na biblioteca Java padrão:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

Assim, usando esse modelo para ir em criar um pool de threads em cache de tamanho fixo:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Agora, se você usar este e submetê-3 tarefas, tudo vai ficar bem. Submeter quaisquer outras tarefas irá resultar em exceções de execução rejeitados.

Tentando isto:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

resultará em todas as threads em execução sequencialmente. Ou seja, o pool de threads nunca vai fazer mais de um thread para lidar com suas tarefas.

Este é um erro no método de execução de ThreadPoolExecutor? Ou talvez isso é intencional? Ou há alguma outra maneira?

Edit: Eu quero algo exatamente como o pool de threads em cache (ele cria tópicos sobre a demanda e, em seguida, mata-los depois de algum tempo de espera), mas com um limite para o número de segmentos que podem criar e a capacidade de continuar a fila tarefas adicionais uma vez que tenha atingido o seu limite de encadeamentos. De acordo com a resposta de sjlee isso é impossível. Olhando para o método execute () de ThreadPoolExecutor é realmente impossível. Eu precisaria subclasse ThreadPoolExecutor e substituir execute () um pouco como SwingWorker faz, mas o que SwingWorker faz em sua execução () é um hack completo.

Foi útil?

Solução

O ThreadPoolExecutor tem as seguintes vários comportamentos-chave, e os seus problemas pode ser explicada por esses comportamentos.

Quando as tarefas são submetidos,

  1. Se o pool de threads não atingiu o tamanho do núcleo, ele cria novos tópicos.
  2. Se o tamanho do núcleo foi atingido e não há segmentos ociosos, ele enfileira tarefas.
  3. Se o tamanho do núcleo foi atingido, não há segmentos ociosos, e a fila ficar cheia, ele cria novos tópicos (até atingir o tamanho máximo).
  4. Se o tamanho máximo foi alcançado, não há segmentos ociosos, e a fila ficar cheia, os chutes de política rejeição em.

No primeiro exemplo, nota que o SynchronousQueue tem essencialmente tamanho de 0. Portanto, no momento em que atingir o tamanho máximo (3), os pontapés de política rejeição em (# 4).

No segundo exemplo, a fila de escolha é um LinkedBlockingQueue que tem um tamanho ilimitado. Portanto, você ficar preso com o comportamento # 2.

Você não pode realmente mexer muito com o tipo de cache ou do tipo fixo, como seu comportamento é quase completamente determinada.

Se você quiser ter um pool de threads delimitada e dinâmica, você precisa usar um tamanho de núcleo positivo e tamanho máximo combinado com uma fila de um tamanho finito. Por exemplo,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Adenda : esta é uma resposta bastante velho, e parece que JDK mudou seu comportamento quando se trata de tamanho do núcleo de 0. Desde JDK 1.6, se o tamanho do núcleo é 0 ea piscina faz não tem nenhum threads, o ThreadPoolExecutor irá adicionar uma linha para executar essa tarefa. Portanto, o tamanho do núcleo de 0 é uma excepção à regra acima. Graças Steve para trazendo que a minha atenção.

Outras dicas

A menos que eu perdi alguma coisa, a solução para a pergunta original é simples. Os seguintes instrumentos de código o comportamento desejado como descrito pelo autor original. Ele vai gerar até 5 tópicos para trabalhar em uma fila ilimitada e segmentos ociosos terminará após 60 segundos.

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

Tive mesmo problema. Desde há outros puts responder a todas as questões em conjunto, eu estou adicionando meu:

Agora, é claramente escrito na docs : Se você usa uma fila que não configuração de blocos (LinkedBlockingQueue) tópicos Max tem nenhum efeito, apenas tópicos principais são usados.

forma:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

Este executor tem:

  1. No conceito de threads max como estamos usando uma fila sem limites. Isso é uma coisa boa porque tal fila pode causar executor para criar grande número de não-core, tópicos extras se ele segue sua política habitual.

  2. Uma fila de Integer.MAX_VALUE tamanho máximo. Submit() vai jogar RejectedExecutionException se o número de tarefas pendentes excede Integer.MAX_VALUE. Não tenho certeza que vai ficar sem memória primeira ou isso vai acontecer.

  3. Tem 4 tópicos principais possíveis. tópicos centrais ociosas sai automaticamente se o inativo por 5 seconds.So, sim, estritamente sob demanda threads.Number pode ser variada utilizando o método setThreads().

  4. Garante min número de tópicos centrais nunca é menor que um, ou então submit() irá rejeitar todas as tarefas. Desde tópicos centrais precisam ser> = tópicos max o método setThreads() define tópicos max bem, embora configuração rosca max é inútil para uma fila sem limites.

Em seu primeiro exemplo, tarefas subsequentes são rejeitados porque o AbortPolicy é o RejectedExecutionHandler padrão. O ThreadPoolExecutor contém as seguintes políticas, que você pode mudar através do método setRejectedExecutionHandler:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

Parece que você quer em cache pool de threads com um CallerRunsPolicy.

Nenhuma das respostas aqui fixado o meu problema, que tinha a ver com a criação de uma quantidade limitada de conexões HTTP usando o cliente HTTP Apache (versão 3.x). Desde que me levou algumas horas para descobrir uma boa configuração, eu vou compartilhar:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

Isso cria um ThreadPoolExecutor que começa com cinco e detém um máximo de dez rodando simultaneamente tópicos usando CallerRunsPolicy para a execução.

Pelo Javadoc para ThreadPoolExecutor:

Se houver mais de corePoolSize mas menos de tópicos maximumPoolSize execução, um novo segmento será criado somente se a fila está cheia . Ao definir corePoolSize e maximumPoolSize o mesmo, você cria um pool de threads de tamanho fixo.

(grifo meu).

A resposta de jitter é o que você quer, embora a minha resposta à sua outra pergunta. :)

não é mais uma opção. Em vez de usar nova SynchronousQueue você pode usar qualquer outra fila também, mas você tem que certificar-se de seu tamanho é 1, de modo que forçará ExecutorService para criar novo segmento.

Não parece como se qualquer uma das respostas realmente responder à pergunta - na verdade, eu não posso ver uma maneira de fazer isso - mesmo se você subclasse de PooledExecutorService uma vez que muitos dos métodos / propriedades são privadas, por exemplo fazendo addIfUnderMaximumPoolSize foi protegido você poderia fazer o seguinte:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

O mais próximo que eu obtive - mas mesmo isso não é uma solução muito boa

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

P.S. não testado acima

Este é o que você quer (pelo menos eu acho que sim). Para uma verificação explicação Jonathan Feinberg responder

Executors.newFixedThreadPool(int n)

Cria um pool de threads que reutiliza um número fixo de tópicos que operam fora de uma fila ilimitada compartilhada. Em qualquer ponto, na maioria nThreads tópicos serão tarefas de processamento ativos. Se tarefas adicionais são submetidos quando os tópicos são ativos, eles vão esperar na fila até que um segmento está disponível. Se qualquer segmento é encerrado devido a uma falha durante a execução antes do desligamento, um novo vai tomar o seu lugar se necessário para executar tarefas subsequentes. Os tópicos na piscina existirá até que seja explicitamente desligamento.

Aqui está uma outra solução. Eu acho que isso comporta solução como você quer que ele (embora não orgulhosos desta solução):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);
  1. Você pode usar ThreadPoolExecutor como sugerido por @sjlee

    Você pode controlar o tamanho da piscina de forma dinâmica. Ter um olhar para esta questão para mais detalhes:

    Tópico dinâmico Piscina

    ou

  2. Você pode usar newWorkStealingPool API, que foi introduzido com o java 8.

    public static ExecutorService newWorkStealingPool()
    

    Cria um pool de threads de roubo de trabalho usando todos os processadores disponíveis como seu nível de alvo paralelismo.

Por padrão, o nível de paralelismo é definida como o número de núcleos de CPU no seu servidor. Se você tem 4 servidor core CPU, tamanho do pool de threads seria 4. Este API retorna tipo ForkJoinPool de ExecutorService e permitir roubo de trabalho de segmentos ociosos roubando tarefas de segmentos ocupados em ForkJoinPool.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top