Punto muerto en ThreadPool
-
09-06-2019 - |
Pregunta
No pude encontrar una implementación ThreadPool decente para Ruby, así que escribí la mía (basada en parte en el código de aquí: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276 , pero se cambió a esperar/señalar y otra implementación para el cierre de ThreadPool.Sin embargo, después de un tiempo de ejecución (con 100 subprocesos y manejando alrededor de 1300 tareas), muere con un punto muerto en la línea 25: espera un nuevo trabajo allí.¿Alguna idea de por qué podría suceder?
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
Solución
Bien, entonces el principal problema con la implementación es:¿Cómo asegurarse de que no se pierda ninguna señal y evitar bloqueos?
En mi experiencia, esto es REALMENTE difícil de lograr con variables de condición y mutex, pero fácil con semáforos.Sucede que Ruby implementa un objeto llamado Queue (o SizedQueue) que debería resolver el problema.Aquí está mi implementación sugerida:
require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { !@block.nil? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end
Y aquí hay un código de prueba simple:
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
Otros consejos
Puedes probar el cola_trabajo gema, diseñada para coordinar el trabajo entre un productor y un grupo de subprocesos de trabajo.
Soy un poco parcial aquí, pero sugeriría modelar esto en algún lenguaje de proceso y verificarlo.Las herramientas disponibles gratuitamente son, por ejemplo, el conjunto de herramientas mCRL2 (que utiliza un lenguaje basado en ACP), Mobility Workbench (pi-calculus) y Spin (PROMELA).
De lo contrario, sugeriría eliminar todo el código que no sea esencial para el problema y encontrar un caso mínimo en el que se produzca el punto muerto.Dudo que los 100 subprocesos y las 1300 tareas sean esenciales para llegar a un punto muerto.Con un caso más pequeño, probablemente pueda agregar algunas impresiones de depuración que proporcionen suficiente información para resolver el problema.
Ok, el problema parece estar en tu método ThreadPool#signal.Lo que puede pasar es:
1 - Todos tus trabajadores están ocupados e intentas procesar un nuevo trabajo.
2 - la línea 90 obtiene un trabajador nulo
3: un trabajador se libera y lo señala, pero la señal se pierde porque ThreadPool no la está esperando.
4 - caes en la línea 95, esperando aunque haya un trabajador libre.
El error aquí es que puedes señalar a un trabajador libre incluso cuando nadie está escuchando.Este método ThreadPool#signal debería ser:
def signal
@mutex.synchronize { @cv.signal }
end
Y el problema es el mismo en el objeto Trabajador.Lo que podría pasar es:
1 - El trabajador acaba de completar un trabajo
2 - Comprueba (línea 17) si hay un trabajo en espera:no hay
3 - El grupo de subprocesos envía un nuevo trabajo y lo señala...pero se pierde la señal
4 - El trabajador espera señal, aunque esté marcado como ocupado
Deberías poner tu método de inicialización como:
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end
A continuación, los métodos Worker#get_block y Worker#reset_block ya no deberían sincronizarse.De esa manera, no puede asignar un bloque a un trabajador entre la prueba de un bloque y la espera de una señal.
El código de los principales comentaristas ha sido de gran ayuda a lo largo de los años.Aquí está actualizado para Ruby 2.x y mejorado con identificación de subprocesos.¿Cómo es eso una mejora?Cuando cada hilo tiene una ID, puede componer ThreadPool con una matriz que almacena información arbitraria.Algunas ideas:
- Sin matriz:Uso típico de ThreadPool.Incluso con GIL, hace que los subprocesos sean muy fáciles de codificar y muy útiles para aplicaciones de alta latencia como el rastreo web de gran volumen.
- ThreadPool y Array dimensionados según el número de CPU:procesos fáciles de bifurcar para usar todas las CPU,
- ThreadPool y Array dimensionados según la cantidad de recursos:por ejemplo, cada elemento de la matriz representa un procesador en un grupo de instancias, por lo que si tiene 10 instancias cada una con 4 CPU, el TP puede administrar el trabajo en 40 subprocesos.
Con estos dos últimos, en lugar de pensar en los subprocesos que realizan el trabajo, piense en ThreadPool que administra los subprocesos que están realizando el trabajo.La tarea de gestión es liviana y cuando se combina con subprocesos, a quién le importa el GIL.
¡Con esta clase, puede codificar un MapReduce basado en clúster en aproximadamente cien líneas de código!Este código es maravillosamente breve, aunque puede resultar un poco complicado asimilarlo por completo.Espero eso ayude.
# Usage:
#
# Thread.abort_on_exception = true # help localize errors while debugging
# pool = ThreadPool.new(thread_pool_size)
# 50.times {|i|
# pool.process { ... }
# or
# pool.process {|id| ... } # worker identifies itself as id
# }
# pool.shutdown()
class ThreadPool
require 'thread'
class ThreadPoolWorker
attr_accessor :id
def initialize(thread_queue, id)
@id = id # worker id is exposed thru tp.process {|id| ... }
@mutex = Mutex.new
@cv = ConditionVariable.new
@idle_queue = thread_queue
@running = true
@block = nil
@thread = Thread.new {
@mutex.synchronize {
while @running
@cv.wait(@mutex) # block until there is work to do
if @block
@mutex.unlock
begin
@block.call(@id)
ensure
@mutex.lock
end
@block = nil
end
@idle_queue << self
end
}
}
end
def set_block(block)
@mutex.synchronize {
raise RuntimeError, "Thread is busy." if @block
@block = block
@cv.signal # notify thread in this class, there is work to be done
}
end
def busy?
@mutex.synchronize { ! @block.nil? }
end
def stop
@mutex.synchronize {
@running = false
@cv.signal
}
@thread.join
end
def name
@thread.inspect
end
end
attr_accessor :max_size, :queue
def initialize(max_size = 10)
@process_mutex = Mutex.new
@max_size = max_size
@queue = Queue.new # of idle workers
@workers = [] # array to hold workers
# construct workers
@max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }
# queue up workers (workers in queue are idle and available to
# work). queue blocks if no workers are available.
@max_size.times {|i| @queue << @workers[i] }
sleep 1 # important to give threads a chance to initialize
end
def size
@workers.size
end
def idle
@queue.size
end
# are any threads idle
def busy?
# @queue.size < @workers.size
@queue.size == 0 && @workers.size == @max_size
end
# block until all threads finish
def shutdown
@workers.each {|w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block = nil, &blk)
@process_mutex.synchronize {
block = blk if block_given?
worker = @queue.pop # assign to next worker; block until one is ready
worker.set_block(block) # give code block to worker and tell it to start
}
end
end