Blocage dans ThreadPool
-
09-06-2019 - |
Question
Je ne pouvais pas trouver une implémentation ThreadPool décente pour Ruby, alors j’ai écrit la mienne (basée en partie sur du code d’ici: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show / 3276 , mais modifié en attente / signal et autre implémentation pour l’arrêt de ThreadPool. Toutefois, après un certain temps d’exécution (100 threads traités et environ 1 300 tâches), il meurt avec blocage dans la ligne 25 - attend une nouvelle Il y a des idées, pourquoi cela pourrait-il arriver?
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
La solution
Ok, le principal problème de cette implémentation est le suivant: comment s’assurer qu’aucun signal n’est perdu et éviter les blocages??
Selon mon expérience, c’est VRAIMENT difficile à obtenir avec les variables de condition et le mutex, mais facile avec les sémaphores. Il se trouve que ruby ??implémente un objet appelé Queue (ou SizedQueue) qui devrait résoudre le problème. Voici ma suggestion d'implémentation:
require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { !@block.nil? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end
Et voici un code de test simple:
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
Autres conseils
Vous pouvez essayer le work_queue , conçu pour coordonner le travail entre un producteur et un pool de travailleurs. discussions.
Je suis légèrement biaisé ici, mais je suggérerais de modéliser ceci dans un langage de processus et de le vérifier par modèle. Les outils disponibles gratuitement sont, par exemple, le jeu d’outils mCRL2 (utilisant un langage basé sur ACP), Mobility Workbench (pi-calcul) et Spin (PROMELA).
Sinon, je suggérerais de supprimer tous les éléments de code qui ne sont pas essentiels au problème et de rechercher un cas minimal où l'impasse se produirait. Je doute que les 100 threads et les 1300 tâches soient essentiels pour obtenir une impasse. Avec un cas plus petit, vous pouvez probablement simplement ajouter des impressions de débogage fournissant suffisamment d’informations pour résoudre le problème.
Ok, le problème semble être lié à votre méthode de signal ThreadPool #. Ce qui peut arriver est:
1 - Tous vos employés sont occupés et vous essayez de traiter un nouvel emploi
2 - la ligne 90 obtient un ouvrier nul
3 - un opérateur est libéré et le signale, mais le signal est perdu car le ThreadPool ne l'attend pas
4 - vous tombez sur la ligne 95 et attendez même s'il y a un travailleur libre.
L’erreur ici est que vous pouvez signaler un ouvrier libre même lorsque personne n’écoute. Cette méthode de signal ThreadPool # devrait être:
def signal
@mutex.synchronize { @cv.signal }
end
Et le problème est le même dans l'objet Worker. Ce qui pourrait arriver est:
1 - Le travailleur vient de terminer un travail
2 - Il vérifie (ligne 17) s'il y a un travail en attente: il n'y en a pas
3 - Le pool de threads envoie un nouveau travail et le signale ... mais le signal est perdu
4 - Le travailleur attend un signal, même s'il est marqué comme occupé
Vous devez définir votre méthode d'initialisation comme suit:
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end
Ensuite, les méthodes Worker # get_block et Worker # reset_block ne doivent plus être synchronisées. De cette façon, vous ne pouvez pas affecter de bloc à un opérateur entre le test de bloc et l’attente d’un signal.
Le code du meilleur commentateur a beaucoup aidé au fil des ans. Ici, il est mis à jour pour ruby ??2.x et amélioré avec l'identification des threads. Comment est-ce une amélioration? Lorsque chaque thread a un identifiant, vous pouvez composer ThreadPool avec un tableau stockant des informations arbitraires. Quelques idées:
- Pas de tableau: utilisation typique de ThreadPool. Même avec le GIL, le threading est facile à coder et très utile pour les applications à latence élevée telles que l’analyse Web à volume élevé,
- ThreadPool et Array dimensionnés en fonction du nombre de processeurs: processus faciles à transformer pour utiliser tous les processeurs,
- ThreadPool et Array dimensionnés en nombre de ressources: chaque élément de tableau, par exemple, représente un processeur dans un pool d'instances. Par conséquent, si vous disposez de 10 instances avec 4 processeurs, le TP peut gérer le travail en 40 sous-processus.
Avec ces deux derniers, plutôt que de penser aux threads effectuant un travail, pensez au ThreadPool qui gère les sous-processus qui effectuent le travail. La tâche de gestion est légère et combinée à des sous-processus, qui se soucie de la GIL.
Avec cette classe, vous pouvez coder MapReduce en cluster en une centaine de lignes de code! Ce code est magnifiquement court, bien que cela puisse être un peu compliqué d’être complètement réussi. J'espère que ça aide.
# Usage:
#
# Thread.abort_on_exception = true # help localize errors while debugging
# pool = ThreadPool.new(thread_pool_size)
# 50.times {|i|
# pool.process { ... }
# or
# pool.process {|id| ... } # worker identifies itself as id
# }
# pool.shutdown()
class ThreadPool
require 'thread'
class ThreadPoolWorker
attr_accessor :id
def initialize(thread_queue, id)
@id = id # worker id is exposed thru tp.process {|id| ... }
@mutex = Mutex.new
@cv = ConditionVariable.new
@idle_queue = thread_queue
@running = true
@block = nil
@thread = Thread.new {
@mutex.synchronize {
while @running
@cv.wait(@mutex) # block until there is work to do
if @block
@mutex.unlock
begin
@block.call(@id)
ensure
@mutex.lock
end
@block = nil
end
@idle_queue << self
end
}
}
end
def set_block(block)
@mutex.synchronize {
raise RuntimeError, "Thread is busy." if @block
@block = block
@cv.signal # notify thread in this class, there is work to be done
}
end
def busy?
@mutex.synchronize { ! @block.nil? }
end
def stop
@mutex.synchronize {
@running = false
@cv.signal
}
@thread.join
end
def name
@thread.inspect
end
end
attr_accessor :max_size, :queue
def initialize(max_size = 10)
@process_mutex = Mutex.new
@max_size = max_size
@queue = Queue.new # of idle workers
@workers = [] # array to hold workers
# construct workers
@max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }
# queue up workers (workers in queue are idle and available to
# work). queue blocks if no workers are available.
@max_size.times {|i| @queue << @workers[i] }
sleep 1 # important to give threads a chance to initialize
end
def size
@workers.size
end
def idle
@queue.size
end
# are any threads idle
def busy?
# @queue.size < @workers.size
@queue.size == 0 && @workers.size == @max_size
end
# block until all threads finish
def shutdown
@workers.each {|w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block = nil, &blk)
@process_mutex.synchronize {
block = blk if block_given?
worker = @queue.pop # assign to next worker; block until one is ready
worker.set_block(block) # give code block to worker and tell it to start
}
end
end