Pergunta

Eu estou tentando usar um ThreadPoolExecutor para agendar tarefas, mas a execução de alguns problemas com as suas políticas.Aqui está o seu comportamento:

  1. Se menos de corePoolSize threads em execução, o Executor sempre prefere a adição de um novo thread, em vez de enfileiramento de mensagens.
  2. Se corePoolSize ou mais threads de execução, o Executor sempre prefere fila de um pedido, em vez de adicionar um novo thread.
  3. Se um pedido não pode ser na fila, uma nova thread é criada a menos que isso excederia maximumPoolSize, caso em que a tarefa será rejeitado.

O comportamento que eu quero é este:

  1. o mesmo que acima
  2. Se mais de corePoolSize, mas menos do que maximumPoolSize threads em execução, prefere adicionar uma nova discussão sobre o enfileiramento de mensagens, e usando um thread ocioso sobre a adição de um novo segmento.
  3. o mesmo que acima

Basicamente, eu não quero que as tarefas sejam rejeitados;Eu quero que eles sejam colocados em fila em um unbounded fila.Mas eu quero ter até maximumPoolSize threads.Se eu usar um unbounded fila, nunca gera threads após bate coreSize.Se eu usar um delimitada fila, rejeita tarefas.Existe alguma maneira de contornar isso?

O que eu estou pensando agora é executar o ThreadPoolExecutor em um SynchronousQueue, mas não se alimentando de tarefas diretamente para ele - em vez de alimentá-los para uma outra ligada LinkedBlockingQueue.Em seguida, outro thread feeds de LinkedBlockingQueue para o Executor, e se for rejeitado, ele simplesmente tenta novamente até que ele não é rejeitada.Esta parece ser uma dor e um pouco de um hack, porém, existe uma maneira mais limpa de o fazer?

Foi útil?

Solução

Seu caso de uso é comum, completamente legítimo e, infelizmente, mais difícil do que se poderia esperar. Para informações de fundo que você pode ler esta discussão e encontre um ponteiro para uma solução (também mencionada no tópico) aqui. A solução de Shay funciona bem.

Geralmente eu ficaria um pouco cauteloso com filas ilimitadas; Geralmente, é melhor ter controle de fluxo explícito que se degrada graciosamente e regula a proporção do trabalho atual/restante para não sobrecarregar o produtor ou o consumidor.

Outras dicas

Provavelmente não é necessário micro-gerenciar o pool de threads como sendo solicitado.

A cache de uma thread do pool de re-uso de threads ociosos enquanto que também permite potencialmente ilimitado threads simultâneos.Isso, claro, pode levar à fuga de desempenho degradante da troca de contexto sobrecarga durante períodos de impulso.

Executors.newCachedThreadPool();

Uma opção melhor é colocar um limite sobre o número total de threads enquanto rejeitam a noção de garantir threads ociosos são utilizados pela primeira vez.As alterações de configuração seria:

corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);

O raciocínio sobre este cenário, se o executor tem menos de corePoolSize threads, que não deve ser muito ocupado.Se o sistema não estiver muito ocupado, então não é de um pouco de mal a rodar um novo thread.Isso fará com que seu ThreadPoolExecutor para criar um novo trabalhador se ele estiver sob o número máximo de trabalhadores permitido.Apenas quando o número máximo de trabalhadores são "running" serão os trabalhadores de braços cruzados esperando para tarefas sejam atribuídas tarefas.Se um trabalhador espera aReasonableTimeDuration sem uma tarefa e, em seguida, é permitido terminar.Usando limites do razoável, para o pool de tamanho (afinal, há apenas muitas CPUs) e razoavelmente grandes de tempo de espera (para manter threads desnecessariamente terminação), os benefícios desejados provavelmente vai ser visto.

A opção final é hackish.Basicamente, o ThreadPoolExecutor internamente usa BlockingQueue.offer para determinar se a fila tem capacidade.Uma implementação personalizada de BlockingQueue poderia rejeitar sempre o offer tentativa.Quando o ThreadPoolExecutor falha ao offer uma tarefa para a fila, ele vai tentar fazer um novo trabalho.Se um novo trabalhador não pode ser criada, uma RejectedExecutionHandler deve ser chamado.Nesse ponto, um costume RejectedExecutionHandler pode forçar uma put na personalizado BlockingQueue.

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
    BlockingQueue<T> delegate;

    public boolean offer(T item) {
        return false;
    }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        delegate.put(r);
    }

    //.... delegate methods
}

Apenas definido corePoolsize = maximumPoolSize e usar uma fila ilimitada?

Em sua lista de pontos, 1 exclui 2, pois corePoolSize sempre será menor ou igual que maximumPoolSize.

Editar

Ainda há algo incompatível entre o que você deseja e o que o TPE oferecerá.

Se você tem uma fila ilimitada, maximumPoolSize é ignorado assim, como você observou, não mais do que corePoolSize Os threads serão criados e usados.

Então, novamente, se você tomar corePoolsize = maximumPoolSize Com uma fila ilimitada, você tem o que quer, não?

Você estaria procurando algo mais como uma piscina de fios em cache?

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/executors.html#newcachedthreadpool ()

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top