Impasse em ThreadPool
-
09-06-2019 - |
Pergunta
Não consegui encontrar uma implementação decente de ThreadPool para Ruby, então escrevi a minha (baseada parcialmente no código daqui: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276 , mas alterado para wait/signal e outras implementações para desligamento do ThreadPool.Porém depois de algum tempo de execução (tendo 100 threads e lidando com cerca de 1300 tarefas), ele morre com deadlock na linha 25 - ele aguarda um novo job lá.Alguma idéia, por que isso pode acontecer?
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
Solução
Ok, então o principal problema com a implementação é: como garantir que nenhum sinal é perdido e evitar bloqueios mortos
Na minha experiência, isso é muito difícil de alcançar com as variáveis ??de condição e de exclusão mútua, mas fácil com semáforos. Acontece que Ruby implementar um objeto chamado Queue (ou SizedQueue) que deve resolver o problema. Aqui está minha implementação sugerido:
require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { !@block.nil? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end
E aqui está um código de teste simples:
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
Outras dicas
Você pode tentar o work_queue gema, projetado para coordenar o trabalho entre um produtor e um pool de trabalhador threads.
Estou um pouco tendenciosa aqui, mas gostaria de sugerir modelar isso em alguma linguagem de processo e modelo de verificá-lo. Livremente ferramentas disponíveis são, por exemplo, o conjunto de ferramentas mCRL2 (usando uma linguagem baseada-ACP), o Workbench Mobility (pi-cálculo) e rotação (PROMELA).
Caso contrário, eu sugeriria remover todos os bits de código que não é essencial para o problema e encontrar um caso minimal onde o bloqueio ocorre. Duvido que os 100 threads e 1300 tarefas são essenciais para obter um impasse. Com um caso menor você pode provavelmente apenas adicionar alguns depuração impressões que fornecem informações suficientes a resolver o problema.
Ok, o problema parece estar em seu método # sinal ThreadPool. O que pode acontecer é:
1 - Todo o seu trabalho estão ocupados e tentar processar um novo emprego
2 - linha 90 recebe um trabalhador nil
3 - um trabalhador se libertou e sinais, mas o sinal é perdido como o ThreadPool não está esperando por ele
4 -. Você cair na linha 95, à espera, embora haja um trabalhador livre
O erro aqui é que você pode sinalizar um trabalhador livre, mesmo quando ninguém está escutando. Este método # sinal ThreadPool deve ser:
def signal
@mutex.synchronize { @cv.signal }
end
E o problema é o mesmo no objeto do trabalhador. O que pode acontecer é:
1 - O trabalhador acabou de completar um trabalho
2 - Verifica (linha 17) se houver uma espera de emprego: não existe
3 - O pool de threads enviar um novo emprego e sinais ... mas o sinal é perdido
4 - A espera trabalhador para um sinal, mesmo que ele é marcado como ocupado
Você deve colocar o seu método de inicialização como:
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end
Em seguida, o trabalhador # get_block e Trabalhadores métodos # reset_block não devem ser sincronizados mais. Dessa forma, você não pode ter um bloco atribuído a um trabalhador entre o teste para um bloco e a espera por um sinal.
código do Top comentarista tem ajudado muito ao longo dos anos. Aqui ele é atualizado para 2.x rubi e melhorou com a identificação fio. Como é que uma melhoria? Quando cada segmento tem um ID, você pode compor ThreadPool com um array que armazena informações arbitrária. Algumas idéias:
- No array: uso típico ThreadPool. Mesmo com a GIL faz enfiando absolutamente fácil de código e muito útil para aplicações de alta latência, como alto volume de web crawling,
- ThreadPool e matriz dimensionada para o número de CPUs: fácil de processos de garfo usar todas as CPUs,
- ThreadPool e matriz dimensionada para o número de recursos: por exemplo, cada elemento da matriz representa um processador através de um conjunto de casos, por isso, se você tem 10 casos cada um com 4 CPUs, o TP pode gerenciar o trabalho em 40 subprocessos .
Com estes dois últimos, em vez de pensar sobre tópicos que fazem o trabalho pensar sobre o ThreadPool gerir subprocessos que estão fazendo o trabalho. A tarefa de gestão é leve e, quando combinado com subprocessos, que se preocupa com o GIL.
Com esta classe, você pode codificar uma MapReduce baseada em cluster em cerca de uma centena de linhas de código! Este código é bem curta, embora possa ser um pouco de uma mente-bend para Grokar totalmente. Espero que ajude.
# Usage:
#
# Thread.abort_on_exception = true # help localize errors while debugging
# pool = ThreadPool.new(thread_pool_size)
# 50.times {|i|
# pool.process { ... }
# or
# pool.process {|id| ... } # worker identifies itself as id
# }
# pool.shutdown()
class ThreadPool
require 'thread'
class ThreadPoolWorker
attr_accessor :id
def initialize(thread_queue, id)
@id = id # worker id is exposed thru tp.process {|id| ... }
@mutex = Mutex.new
@cv = ConditionVariable.new
@idle_queue = thread_queue
@running = true
@block = nil
@thread = Thread.new {
@mutex.synchronize {
while @running
@cv.wait(@mutex) # block until there is work to do
if @block
@mutex.unlock
begin
@block.call(@id)
ensure
@mutex.lock
end
@block = nil
end
@idle_queue << self
end
}
}
end
def set_block(block)
@mutex.synchronize {
raise RuntimeError, "Thread is busy." if @block
@block = block
@cv.signal # notify thread in this class, there is work to be done
}
end
def busy?
@mutex.synchronize { ! @block.nil? }
end
def stop
@mutex.synchronize {
@running = false
@cv.signal
}
@thread.join
end
def name
@thread.inspect
end
end
attr_accessor :max_size, :queue
def initialize(max_size = 10)
@process_mutex = Mutex.new
@max_size = max_size
@queue = Queue.new # of idle workers
@workers = [] # array to hold workers
# construct workers
@max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }
# queue up workers (workers in queue are idle and available to
# work). queue blocks if no workers are available.
@max_size.times {|i| @queue << @workers[i] }
sleep 1 # important to give threads a chance to initialize
end
def size
@workers.size
end
def idle
@queue.size
end
# are any threads idle
def busy?
# @queue.size < @workers.size
@queue.size == 0 && @workers.size == @max_size
end
# block until all threads finish
def shutdown
@workers.each {|w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block = nil, &blk)
@process_mutex.synchronize {
block = blk if block_given?
worker = @queue.pop # assign to next worker; block until one is ready
worker.set_block(block) # give code block to worker and tell it to start
}
end
end