حالة توقف تام في ThreadPool
-
09-06-2019 - |
سؤال
لم أتمكن من العثور على تطبيق ThreadPool مناسب لـ Ruby، لذلك كتبت تطبيقي (يعتمد جزئيًا على الكود من هنا: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276 ، ولكن تم تغييره إلى الانتظار/الإشارة والتنفيذ الآخر لإيقاف تشغيل ThreadPool.ولكن بعد مرور بعض الوقت من التشغيل (وجود 100 موضوع والتعامل مع حوالي 1300 مهمة)، يموت في حالة توقف تام على السطر 25 - فهو ينتظر وظيفة جديدة هناك.أي أفكار، لماذا قد يحدث؟
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
المحلول
حسنًا، المشكلة الرئيسية في التنفيذ هي:كيفية التأكد من عدم فقدان أي إشارة وتجنب الأقفال الميتة؟
في تجربتي، من الصعب حقًا تحقيق ذلك باستخدام متغيرات الحالة وكائن المزامنة (mutex)، ولكنه سهل باستخدام الإشارات.يحدث أن تقوم روبي بتنفيذ كائن يسمى قائمة الانتظار (أو SizedQueue) والذي يجب أن يحل المشكلة.هنا هو التنفيذ المقترح:
require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there's a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { !@block.nil? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if !@queue.empty? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end
وهنا رمز اختبار بسيط:
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
نصائح أخرى
يمكنك تجربة Work_queue جوهرة، مصممة لتنسيق العمل بين المنتج ومجموعة من خيوط العمل.
أنا متحيز بعض الشيء هنا، لكني أقترح نمذجة ذلك في بعض لغات العمليات والتحقق من النموذج.الأدوات المتاحة مجانًا، على سبيل المثال، مجموعة أدوات mCRL2 (باستخدام لغة تعتمد على ACP)، وMobility Workbench (pi-calculus)، وSpin (PROMELA).
بخلاف ذلك، أود أن أقترح إزالة كل جزء من التعليمات البرمجية غير الضرورية للمشكلة وإيجاد الحد الأدنى من الحالة التي يحدث فيها حالة توقف تام.أشك في أن 100 موضوع و1300 مهمة ضرورية للوصول إلى طريق مسدود.مع حالة أصغر، ربما يمكنك فقط إضافة بعض مطبوعات تصحيح الأخطاء التي توفر معلومات كافية لحل المشكلة.
حسنًا، يبدو أن المشكلة تكمن في طريقة ThreadPool#signal.ما قد يحدث هو:
1- كل العاملين لديك مشغولون وتحاول إنجاز عمل جديد
2 - السطر 90 يحصل على عامل صفر
3- يتم تحرير العامل وإرسال الإشارة إليه ولكن يتم فقدان الإشارة لأن ThreadPool لا ينتظرها
4- تقع على الخط 95 تنتظر رغم وجود عامل مجاني.
الخطأ هنا هو أنه يمكنك الإشارة إلى عامل مجاني حتى عندما لا يستمع أحد.يجب أن تكون طريقة ThreadPool#signal هذه:
def signal
@mutex.synchronize { @cv.signal }
end
والمشكلة هي نفسها في الكائن العامل.ما قد يحدث هو:
1- أن العامل أنهى عمله للتو
2 - يقوم بالتحقق (السطر 17) إذا كان هناك وظيفة تنتظر:لا يوجد
3 - يقوم تجمع الخيوط بإرسال مهمة جديدة ويشير إليها ...لكن الإشارة مفقودة
4- ينتظر العامل الإشارة حتى لو كانت مشغولة
يجب عليك وضع طريقة التهيئة الخاصة بك على النحو التالي:
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end
بعد ذلك، يجب ألا تتم مزامنة طريقتي Worker#get_block وWorker#reset_block بعد الآن.بهذه الطريقة، لا يمكنك تعيين كتلة للعامل بين اختبار الكتلة وانتظار الإشارة.
لقد ساعد رمز أفضل المعلقين كثيرًا على مر السنين.هنا يتم تحديثه لـ Ruby 2.x وتحسينه من خلال تحديد الخيط.كيف يكون ذلك تحسنا؟عندما يكون لكل مؤشر ترابط معرف، يمكنك إنشاء ThreadPool باستخدام مصفوفة تقوم بتخزين معلومات عشوائية.بعض الافكار:
- لا يوجد مصفوفة:استخدام ThreadPool النموذجي.حتى مع GIL، فإنه يجعل عملية الترابط سهلة الترميز ومفيدة جدًا للتطبيقات ذات زمن الوصول العالي مثل الزحف على الويب بكميات كبيرة،
- تم تحديد حجم ThreadPool وArray حسب عدد وحدات المعالجة المركزية (CPU):من السهل تفرع العمليات لاستخدام جميع وحدات المعالجة المركزية (CPUs)،
- تم تحديد حجم ThreadPool وArray حسب عدد الموارد:على سبيل المثال، يمثل كل عنصر مصفوفة معالجًا واحدًا عبر مجموعة من المثيلات، لذلك إذا كان لديك 10 مثيلات تحتوي كل منها على 4 وحدات معالجة مركزية، فيمكن لـ TP إدارة العمل عبر 40 عملية فرعية.
مع هذين الأخيرين، بدلاً من التفكير في سلاسل الرسائل التي تعمل، فكر في إدارة ThreadPool للعمليات الفرعية التي تقوم بهذا العمل.مهمة الإدارة خفيفة الوزن وعندما يتم دمجها مع العمليات الفرعية، من يهتم بـ GIL.
باستخدام هذه الفئة، يمكنك ترميز MapReduce القائم على المجموعة في حوالي مائة سطر من التعليمات البرمجية!هذا الرمز قصير بشكل جميل على الرغم من أنه قد يكون منحنيًا قليلاً للعقل بشكل كامل.نأمل أن يساعد.
# Usage:
#
# Thread.abort_on_exception = true # help localize errors while debugging
# pool = ThreadPool.new(thread_pool_size)
# 50.times {|i|
# pool.process { ... }
# or
# pool.process {|id| ... } # worker identifies itself as id
# }
# pool.shutdown()
class ThreadPool
require 'thread'
class ThreadPoolWorker
attr_accessor :id
def initialize(thread_queue, id)
@id = id # worker id is exposed thru tp.process {|id| ... }
@mutex = Mutex.new
@cv = ConditionVariable.new
@idle_queue = thread_queue
@running = true
@block = nil
@thread = Thread.new {
@mutex.synchronize {
while @running
@cv.wait(@mutex) # block until there is work to do
if @block
@mutex.unlock
begin
@block.call(@id)
ensure
@mutex.lock
end
@block = nil
end
@idle_queue << self
end
}
}
end
def set_block(block)
@mutex.synchronize {
raise RuntimeError, "Thread is busy." if @block
@block = block
@cv.signal # notify thread in this class, there is work to be done
}
end
def busy?
@mutex.synchronize { ! @block.nil? }
end
def stop
@mutex.synchronize {
@running = false
@cv.signal
}
@thread.join
end
def name
@thread.inspect
end
end
attr_accessor :max_size, :queue
def initialize(max_size = 10)
@process_mutex = Mutex.new
@max_size = max_size
@queue = Queue.new # of idle workers
@workers = [] # array to hold workers
# construct workers
@max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }
# queue up workers (workers in queue are idle and available to
# work). queue blocks if no workers are available.
@max_size.times {|i| @queue << @workers[i] }
sleep 1 # important to give threads a chance to initialize
end
def size
@workers.size
end
def idle
@queue.size
end
# are any threads idle
def busy?
# @queue.size < @workers.size
@queue.size == 0 && @workers.size == @max_size
end
# block until all threads finish
def shutdown
@workers.each {|w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block = nil, &blk)
@process_mutex.synchronize {
block = blk if block_given?
worker = @queue.pop # assign to next worker; block until one is ready
worker.set_block(block) # give code block to worker and tell it to start
}
end
end