Dooiepunt in ThreadPool
-
09-06-2019 - |
Vra
Ek kon nie 'n ordentlike ThreadPool implementering vir Ruby kry, so ek het myne (wat gebaseer is deels op-kode van hier af: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show . / 3276 , maar verander om te wag / sein en ander implementering vir ThreadPool afsluit maar na 'n rukkie van loop (met 100 drade en die hantering van ongeveer 1300 take), dit sterf met dooiepunt op lyn 25 - dit wag vir 'n nuwe werk daar. Enige idees, hoekom dit kan gebeur?
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
Oplossing
Ok, so die grootste probleem met die implementering is: hoe om seker te maak geen sein verloor en vermy dooie slotte
?In my ervaring, dit is werklik moeilik om te bereik met voorwaarde veranderlikes en Mutex, maar maklik met semafore. Dit gebeur so dat Ruby implementeer 'n voorwerp genoem Queue (of SizedQueue) dat die probleem op te los. Hier is my voorgestelde implementering:
require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { !@block.nil? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end
En hier is 'n eenvoudige toets kode:
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
Ander wenke
Jy kan probeer om die work_queue juweel, wat ontwerp is om die werk te koördineer tussen 'n produsent en 'n poel van werker drade.
Ek is effens hier bevooroordeeld, maar ek sou raai modellering hierdie in 'n proses taal en model check it. Vrylik beskikbaar gereedskap is, byvoorbeeld, die mCRL2 toolset (met behulp van 'n ACP-gebaseerde taal), die mobiliteit Werksbank (pi-analise) en Spin (PROMELA).
Anders Ek sou raai die verwydering van elke stukkie kode wat nie noodsaaklik is om die probleem en vind 'n minimale geval waar die dooiepunt ontstaan. Ek twyfel dat dit die 100 drade en 1300 take is noodsaaklik om 'n dooiepunt kry. Met 'n kleiner geval jy kan waarskynlik net 'n bietjie debug afdrukke wat genoeg inligting die oplossing van die probleem te verskaf.
Ok, die probleem blyk te wees in jou ThreadPool # sein metode. Wat kan gebeur is:
1 - Al jou werker is besig en jy probeer om 'n nuwe werk te verwerk
2 - lyn 90 kry 'n nul werker
3 - 'n werker ontslae bevry en seine dit, maar die sein verloor as die ThreadPool nie wag vir dit
4 -. Jy val op lyn 95, wag, selfs al is daar 'n gratis werker
Die fout hier is dat jy 'n gratis werker selfs wanneer niemand luister kan aandui. Dit ThreadPool # sein metode moet wees:
def signal
@mutex.synchronize { @cv.signal }
end
En die probleem is dieselfde in die Werker voorwerp. Wat kan gebeur is:
1 - Die werker pas 'n werk
2 - Dit tjeks (lyn 17) as daar 'n werk wag: daar is nie
3 - Die draad swembad stuur 'n nuwe werk en seine dit ... maar die sein verloor
4 - Die werker wag vir 'n sein, selfs al is dit is gemerk as besig
Jy moet jou inisialiseer metode sit as:
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end
Volgende, die Werker # get_block en Werker # reset_block metodes moet nie gesinchroniseer word nie. Op dié manier kan jy nie 'n blok aan 'n werker tussen die toets vir 'n blok en die wag vir 'n sein.
Top opmerkings se kode het gehelp om uit soveel oor die jare. Hier word opgedateer vir Ruby 2.x en verbeter met draad identifikasie. Hoe is dit 'n verbetering? Wanneer elke draad het 'n ID, kan jy ThreadPool komponeer met 'n verskeidenheid wat arbitrêre inligting stoor. 'N paar idees:
- Geen skikking: tipiese ThreadPool gebruik. Selfs met die GIL dit maak threading dood maklik om te kode en baie nuttig vir 'n hoë-latency programme soos hoë-volume web deur soek,
- ThreadPool en Array grootte om verskeie CPUs: maklik om prosesse vurk om al CPUs gebruik,
- ThreadPool en Array grootte om verskeie bronne:. Bv elke verskeidenheid element verteenwoordig een verwerker oor 'n poel van gevalle, so as jy 10 gevalle elk met 4 CPUs, kan die TP werk oor 40 subprosesse bestuur
Met hierdie laaste twee, eerder as om te dink oor gesprekke wat doen die werk te dink oor die ThreadPool besturende subprosesse wat doen die werk. Die bestuurstaak is liggewig en wanneer dit gekombineer met subprosesse, wat omgee vir die GIL.
Met hierdie klas, kan jy die kode op 'n cluster gebaseer MapReduce in oor 'n honderd lyne kode! Hierdie kode is pragtig kort hoewel dit kan 'n bietjie van 'n mind-buiging ten volle grok. Hoop dit help.
# Usage:
#
# Thread.abort_on_exception = true # help localize errors while debugging
# pool = ThreadPool.new(thread_pool_size)
# 50.times {|i|
# pool.process { ... }
# or
# pool.process {|id| ... } # worker identifies itself as id
# }
# pool.shutdown()
class ThreadPool
require 'thread'
class ThreadPoolWorker
attr_accessor :id
def initialize(thread_queue, id)
@id = id # worker id is exposed thru tp.process {|id| ... }
@mutex = Mutex.new
@cv = ConditionVariable.new
@idle_queue = thread_queue
@running = true
@block = nil
@thread = Thread.new {
@mutex.synchronize {
while @running
@cv.wait(@mutex) # block until there is work to do
if @block
@mutex.unlock
begin
@block.call(@id)
ensure
@mutex.lock
end
@block = nil
end
@idle_queue << self
end
}
}
end
def set_block(block)
@mutex.synchronize {
raise RuntimeError, "Thread is busy." if @block
@block = block
@cv.signal # notify thread in this class, there is work to be done
}
end
def busy?
@mutex.synchronize { ! @block.nil? }
end
def stop
@mutex.synchronize {
@running = false
@cv.signal
}
@thread.join
end
def name
@thread.inspect
end
end
attr_accessor :max_size, :queue
def initialize(max_size = 10)
@process_mutex = Mutex.new
@max_size = max_size
@queue = Queue.new # of idle workers
@workers = [] # array to hold workers
# construct workers
@max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }
# queue up workers (workers in queue are idle and available to
# work). queue blocks if no workers are available.
@max_size.times {|i| @queue << @workers[i] }
sleep 1 # important to give threads a chance to initialize
end
def size
@workers.size
end
def idle
@queue.size
end
# are any threads idle
def busy?
# @queue.size < @workers.size
@queue.size == 0 && @workers.size == @max_size
end
# block until all threads finish
def shutdown
@workers.each {|w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block = nil, &blk)
@process_mutex.synchronize {
block = blk if block_given?
worker = @queue.pop # assign to next worker; block until one is ready
worker.set_block(block) # give code block to worker and tell it to start
}
end
end