سؤال

لم أتمكن من العثور على تطبيق ThreadPool مناسب لـ Ruby، لذلك كتبت تطبيقي (يعتمد جزئيًا على الكود من هنا: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276 ، ولكن تم تغييره إلى الانتظار/الإشارة والتنفيذ الآخر لإيقاف تشغيل ThreadPool.ولكن بعد مرور بعض الوقت من التشغيل (وجود 100 موضوع والتعامل مع حوالي 1300 مهمة)، يموت في حالة توقف تام على السطر 25 - فهو ينتظر وظيفة جديدة هناك.أي أفكار، لماذا قد يحدث؟

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end
هل كانت مفيدة؟

المحلول

حسنًا، المشكلة الرئيسية في التنفيذ هي:كيفية التأكد من عدم فقدان أي إشارة وتجنب الأقفال الميتة؟

في تجربتي، من الصعب حقًا تحقيق ذلك باستخدام متغيرات الحالة وكائن المزامنة (mutex)، ولكنه سهل باستخدام الإشارات.يحدث أن تقوم روبي بتنفيذ كائن يسمى قائمة الانتظار (أو SizedQueue) والذي يجب أن يحل المشكلة.هنا هو التنفيذ المقترح:

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

وهنا رمز اختبار بسيط:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown

نصائح أخرى

يمكنك تجربة Work_queue جوهرة، مصممة لتنسيق العمل بين المنتج ومجموعة من خيوط العمل.

أنا متحيز بعض الشيء هنا، لكني أقترح نمذجة ذلك في بعض لغات العمليات والتحقق من النموذج.الأدوات المتاحة مجانًا، على سبيل المثال، مجموعة أدوات mCRL2 (باستخدام لغة تعتمد على ACP)، وMobility Workbench (pi-calculus)، وSpin (PROMELA).

بخلاف ذلك، أود أن أقترح إزالة كل جزء من التعليمات البرمجية غير الضرورية للمشكلة وإيجاد الحد الأدنى من الحالة التي يحدث فيها حالة توقف تام.أشك في أن 100 موضوع و1300 مهمة ضرورية للوصول إلى طريق مسدود.مع حالة أصغر، ربما يمكنك فقط إضافة بعض مطبوعات تصحيح الأخطاء التي توفر معلومات كافية لحل المشكلة.

حسنًا، يبدو أن المشكلة تكمن في طريقة ThreadPool#signal.ما قد يحدث هو:

1- كل العاملين لديك مشغولون وتحاول إنجاز عمل جديد

2 - السطر 90 يحصل على عامل صفر

3- يتم تحرير العامل وإرسال الإشارة إليه ولكن يتم فقدان الإشارة لأن ThreadPool لا ينتظرها

4- تقع على الخط 95 تنتظر رغم وجود عامل مجاني.

الخطأ هنا هو أنه يمكنك الإشارة إلى عامل مجاني حتى عندما لا يستمع أحد.يجب أن تكون طريقة ThreadPool#signal هذه:

def signal
     @mutex.synchronize { @cv.signal }
end

والمشكلة هي نفسها في الكائن العامل.ما قد يحدث هو:

1- أن العامل أنهى عمله للتو

2 - يقوم بالتحقق (السطر 17) إذا كان هناك وظيفة تنتظر:لا يوجد

3 - يقوم تجمع الخيوط بإرسال مهمة جديدة ويشير إليها ...لكن الإشارة مفقودة

4- ينتظر العامل الإشارة حتى لو كانت مشغولة

يجب عليك وضع طريقة التهيئة الخاصة بك على النحو التالي:

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

بعد ذلك، يجب ألا تتم مزامنة طريقتي Worker#get_block وWorker#reset_block بعد الآن.بهذه الطريقة، لا يمكنك تعيين كتلة للعامل بين اختبار الكتلة وانتظار الإشارة.

لقد ساعد رمز أفضل المعلقين كثيرًا على مر السنين.هنا يتم تحديثه لـ Ruby 2.x وتحسينه من خلال تحديد الخيط.كيف يكون ذلك تحسنا؟عندما يكون لكل مؤشر ترابط معرف، يمكنك إنشاء ThreadPool باستخدام مصفوفة تقوم بتخزين معلومات عشوائية.بعض الافكار:

  • لا يوجد مصفوفة:استخدام ThreadPool النموذجي.حتى مع GIL، فإنه يجعل عملية الترابط سهلة الترميز ومفيدة جدًا للتطبيقات ذات زمن الوصول العالي مثل الزحف على الويب بكميات كبيرة،
  • تم تحديد حجم ThreadPool وArray حسب عدد وحدات المعالجة المركزية (CPU):من السهل تفرع العمليات لاستخدام جميع وحدات المعالجة المركزية (CPUs)،
  • تم تحديد حجم ThreadPool وArray حسب عدد الموارد:على سبيل المثال، يمثل كل عنصر مصفوفة معالجًا واحدًا عبر مجموعة من المثيلات، لذلك إذا كان لديك 10 مثيلات تحتوي كل منها على 4 وحدات معالجة مركزية، فيمكن لـ TP إدارة العمل عبر 40 عملية فرعية.

مع هذين الأخيرين، بدلاً من التفكير في سلاسل الرسائل التي تعمل، فكر في إدارة ThreadPool للعمليات الفرعية التي تقوم بهذا العمل.مهمة الإدارة خفيفة الوزن وعندما يتم دمجها مع العمليات الفرعية، من يهتم بـ GIL.

باستخدام هذه الفئة، يمكنك ترميز MapReduce القائم على المجموعة في حوالي مائة سطر من التعليمات البرمجية!هذا الرمز قصير بشكل جميل على الرغم من أنه قد يكون منحنيًا قليلاً للعقل بشكل كامل.نأمل أن يساعد.

# Usage:
#
#   Thread.abort_on_exception = true # help localize errors while debugging
#   pool = ThreadPool.new(thread_pool_size)
#   50.times {|i|
#     pool.process { ... }
#     or
#     pool.process {|id| ... } # worker identifies itself as id
#   }
#   pool.shutdown()

class ThreadPool

  require 'thread'

  class ThreadPoolWorker

    attr_accessor :id

    def initialize(thread_queue, id)
      @id = id # worker id is exposed thru tp.process {|id| ... }
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @idle_queue = thread_queue
      @running = true
      @block = nil
      @thread = Thread.new {
        @mutex.synchronize {
          while @running
            @cv.wait(@mutex) # block until there is work to do
            if @block
              @mutex.unlock
              begin
                @block.call(@id)
              ensure
                @mutex.lock
              end
              @block = nil
            end
            @idle_queue << self
          end
        }
      }
    end

    def set_block(block)
      @mutex.synchronize {
        raise RuntimeError, "Thread is busy." if @block
        @block = block
        @cv.signal # notify thread in this class, there is work to be done
      }
    end

    def busy?
      @mutex.synchronize { ! @block.nil? }
    end

    def stop
      @mutex.synchronize {
        @running = false
        @cv.signal
      }
      @thread.join
    end

    def name
      @thread.inspect
    end
  end


  attr_accessor :max_size, :queue

  def initialize(max_size = 10)
    @process_mutex = Mutex.new
    @max_size = max_size
    @queue = Queue.new # of idle workers
    @workers = []      # array to hold workers

    # construct workers
    @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }

    # queue up workers (workers in queue are idle and available to
    # work).  queue blocks if no workers are available.
    @max_size.times {|i| @queue << @workers[i] }

    sleep 1 # important to give threads a chance to initialize
  end

  def size
    @workers.size
  end

  def idle
    @queue.size
  end

  # are any threads idle

  def busy?
    # @queue.size < @workers.size
    @queue.size == 0 && @workers.size == @max_size
  end

  # block until all threads finish

  def shutdown
    @workers.each {|w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block = nil, &blk)
    @process_mutex.synchronize {
      block = blk if block_given?
      worker = @queue.pop # assign to next worker; block until one is ready
      worker.set_block(block) # give code block to worker and tell it to start
    }
  end


end
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top