Deadlock in Threadpool
-
09-06-2019 - |
Frage
Ich kann nicht eine anständige Thread Implementierung für Ruby finden, also schreibe ich mich (basierte zum Teil auf Code von hier: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show . / 3276 , sondern warten / Signal und andere Implementierung für Thread Abschaltung jedoch lief nach einiger Zeit (mit 100 Threads und Handhabung etwa 1300 Aufgaben) geändert wird, stirbt sie auf Linie mit Deadlock 25 - es für ein neues wartet Irgendwelche Ideen dort Arbeit geleistet., warum es könnte passieren?
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
Lösung
Ok, also das Hauptproblem bei der Umsetzung: Wie um sicherzustellen, dass kein Signal verloren und tot Sperren vermeiden
?Nach meiner Erfahrung ist es wirklich schwer, mit Bedingungsvariablen und Mutex zu erreichen, aber leicht mit Semaphore. Nun ist es so, dass Ruby ein Objekt namens Queue (oder SizedQueue) implementieren, die das Problem lösen sollte. Hier ist meine vorgeschlagene Umsetzung:
require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { !@block.nil? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end
Und hier ist ein einfacher Test Code:
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
Andere Tipps
Sie können versuchen, das work_queue Juwel, entworfen Arbeit zwischen einem Hersteller zu koordinieren und zu einem Pool von Arbeits Threads.
Ich bin hier etwas voreingenommen, aber ich würde vorschlagen, diese Sprache in einiger Prozessmodellierung und Modell es überprüfen. Frei verfügbare Tools sind zum Beispiel der mCRL2 Toolset (eine ACP-basierte Sprache verwendet wird), der Mobility-Workbench (pi-Kalkül) und Spin (Promela).
Ansonsten würde ich vorschlagen, jedes Stück Code zu entfernen, die nicht wesentlich für das Problem ist und einen minimalen Fall zu finden, wo der Stillstand auftritt. Ich bezweifle, dass es die 100 Themen und 1300 Aufgaben wesentlich sind eine Sackgasse zu bekommen. Mit einem kleineren Fall kann man wohl hinzufügen, um nur einige Debug-Drucke, die die Lösung des Problems genügend Informationen zur Verfügung stellen.
Ok, scheint das Problem in Ihrem Thread # Signal Methode. Was kann passieren, ist:
1 - Alle Ihre Arbeiter sind damit beschäftigt, und Sie versuchen, einen neuen Job zu verarbeiten
2 - Linie 90 erhält einen Null Arbeiter
3 - ein Arbeiter bekommen befreit und Signale, aber das Signal verloren geht, wie der Threadpool nicht darauf wartet
4 -. Sie fallen auf der Linie 95, auch warten, obwohl es ein freier Arbeiter
Hier Der Fehler ist, dass Sie einen freien Arbeiter signalisieren können, auch wenn niemand zuhört. Dieser Thread # Signal Methode sollte sein:
def signal
@mutex.synchronize { @cv.signal }
end
Und das Problem ist das gleiche in dem Worker-Objekt. Was kann passieren, ist:
1 - Der Arbeiter gerade eine Job
2 - Es wird überprüft (Linie 17), wenn es einen Job warten ist: Es ist nicht
3 - Der Thread-Pool einen neuen Job senden und Signale, die es ... aber das Signal verloren geht,
4 - Der Arbeiter wartet auf ein Signal, auch wenn es so gut besucht markiert
Sie sollten Ihre Methode initialize setzen, wie:
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end
Als nächstes wird der Arbeiter # get_block und Worker # reset_block Methoden sollten nicht mehr synchronisiert werden. Auf diese Weise können Sie nicht um einen Block zu einem Arbeiter zwischen dem Test für einen Block und das Warten auf ein Signal zugeordnet werden.
Top Kommentator Code hat so viel über die Jahre geholfen. Hier ist es für Ruby 2.x aktualisiert und mit Gewinde Identifizierung verbessert. Wie ist das eine Verbesserung? Wenn jeder Thread eine ID hat, können Sie Threadpool mit einem Array zusammensetzen, die beliebige Informationen speichert. Einige Ideen:
- Keine Array: typische Threadpool-Nutzung. Auch bei der GIL macht es tot leicht zu Code und sehr nützlich für hohe Latenz-Anwendungen wie High-Volume-Web-Crawling-Threading,
- Threadpool und Array Größe Anzahl der CPUs: einfache Prozesse gabeln alle CPUs zu verwenden,
- Threadpool und Array bemessen Anzahl von Ressourcen. B. jedes Array-Element stellt einen Prozessor über einen Pool von Fällen, also wenn Sie 10 Instanzen mit jeweils 4 CPUs kann die TP Arbeit über 40 Teilprozesse verwalten
Mit diesen letzten zwei, anstatt darüber nachzudenken, Threads zu tun denkt Arbeit über die Thread Verwaltung Teilprozesse, die die Arbeit tun. Die Managementaufgabe ist leicht und, wenn sie mit Unterprozessen kombiniert, wer kümmert sich um die GIL.
Mit dieser Klasse können Sie einen Cluster basierend MapReduce in etwa hundert Zeilen Code codieren up! Dieser Code ist schön kurz, obwohl es ein bisschen wie ein Geist-bend sein kann, um voll grok. Hoffe, es hilft.
# Usage:
#
# Thread.abort_on_exception = true # help localize errors while debugging
# pool = ThreadPool.new(thread_pool_size)
# 50.times {|i|
# pool.process { ... }
# or
# pool.process {|id| ... } # worker identifies itself as id
# }
# pool.shutdown()
class ThreadPool
require 'thread'
class ThreadPoolWorker
attr_accessor :id
def initialize(thread_queue, id)
@id = id # worker id is exposed thru tp.process {|id| ... }
@mutex = Mutex.new
@cv = ConditionVariable.new
@idle_queue = thread_queue
@running = true
@block = nil
@thread = Thread.new {
@mutex.synchronize {
while @running
@cv.wait(@mutex) # block until there is work to do
if @block
@mutex.unlock
begin
@block.call(@id)
ensure
@mutex.lock
end
@block = nil
end
@idle_queue << self
end
}
}
end
def set_block(block)
@mutex.synchronize {
raise RuntimeError, "Thread is busy." if @block
@block = block
@cv.signal # notify thread in this class, there is work to be done
}
end
def busy?
@mutex.synchronize { ! @block.nil? }
end
def stop
@mutex.synchronize {
@running = false
@cv.signal
}
@thread.join
end
def name
@thread.inspect
end
end
attr_accessor :max_size, :queue
def initialize(max_size = 10)
@process_mutex = Mutex.new
@max_size = max_size
@queue = Queue.new # of idle workers
@workers = [] # array to hold workers
# construct workers
@max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }
# queue up workers (workers in queue are idle and available to
# work). queue blocks if no workers are available.
@max_size.times {|i| @queue << @workers[i] }
sleep 1 # important to give threads a chance to initialize
end
def size
@workers.size
end
def idle
@queue.size
end
# are any threads idle
def busy?
# @queue.size < @workers.size
@queue.size == 0 && @workers.size == @max_size
end
# block until all threads finish
def shutdown
@workers.each {|w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block = nil, &blk)
@process_mutex.synchronize {
block = blk if block_given?
worker = @queue.pop # assign to next worker; block until one is ready
worker.set_block(block) # give code block to worker and tell it to start
}
end
end