Frage

Ich kann nicht eine anständige Thread Implementierung für Ruby finden, also schreibe ich mich (basierte zum Teil auf Code von hier: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show . / 3276 , sondern warten / Signal und andere Implementierung für Thread Abschaltung jedoch lief nach einiger Zeit (mit 100 Threads und Handhabung etwa 1300 Aufgaben) geändert wird, stirbt sie auf Linie mit Deadlock 25 - es für ein neues wartet Irgendwelche Ideen dort Arbeit geleistet., warum es könnte passieren?

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end
War es hilfreich?

Lösung

Ok, also das Hauptproblem bei der Umsetzung: Wie um sicherzustellen, dass kein Signal verloren und tot Sperren vermeiden

?

Nach meiner Erfahrung ist es wirklich schwer, mit Bedingungsvariablen und Mutex zu erreichen, aber leicht mit Semaphore. Nun ist es so, dass Ruby ein Objekt namens Queue (oder SizedQueue) implementieren, die das Problem lösen sollte. Hier ist meine vorgeschlagene Umsetzung:

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

Und hier ist ein einfacher Test Code:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown

Andere Tipps

Sie können versuchen, das work_queue Juwel, entworfen Arbeit zwischen einem Hersteller zu koordinieren und zu einem Pool von Arbeits Threads.

Ich bin hier etwas voreingenommen, aber ich würde vorschlagen, diese Sprache in einiger Prozessmodellierung und Modell es überprüfen. Frei verfügbare Tools sind zum Beispiel der mCRL2 Toolset (eine ACP-basierte Sprache verwendet wird), der Mobility-Workbench (pi-Kalkül) und Spin (Promela).

Ansonsten würde ich vorschlagen, jedes Stück Code zu entfernen, die nicht wesentlich für das Problem ist und einen minimalen Fall zu finden, wo der Stillstand auftritt. Ich bezweifle, dass es die 100 Themen und 1300 Aufgaben wesentlich sind eine Sackgasse zu bekommen. Mit einem kleineren Fall kann man wohl hinzufügen, um nur einige Debug-Drucke, die die Lösung des Problems genügend Informationen zur Verfügung stellen.

Ok, scheint das Problem in Ihrem Thread # Signal Methode. Was kann passieren, ist:

1 - Alle Ihre Arbeiter sind damit beschäftigt, und Sie versuchen, einen neuen Job zu verarbeiten

2 - Linie 90 erhält einen Null Arbeiter

3 - ein Arbeiter bekommen befreit und Signale, aber das Signal verloren geht, wie der Threadpool nicht darauf wartet

4 -. Sie fallen auf der Linie 95, auch warten, obwohl es ein freier Arbeiter

Hier Der Fehler ist, dass Sie einen freien Arbeiter signalisieren können, auch wenn niemand zuhört. Dieser Thread # Signal Methode sollte sein:

def signal
     @mutex.synchronize { @cv.signal }
end

Und das Problem ist das gleiche in dem Worker-Objekt. Was kann passieren, ist:

1 - Der Arbeiter gerade eine Job

2 - Es wird überprüft (Linie 17), wenn es einen Job warten ist: Es ist nicht

3 - Der Thread-Pool einen neuen Job senden und Signale, die es ... aber das Signal verloren geht,

4 - Der Arbeiter wartet auf ein Signal, auch wenn es so gut besucht markiert

Sie sollten Ihre Methode initialize setzen, wie:

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

Als nächstes wird der Arbeiter # get_block und Worker # reset_block Methoden sollten nicht mehr synchronisiert werden. Auf diese Weise können Sie nicht um einen Block zu einem Arbeiter zwischen dem Test für einen Block und das Warten auf ein Signal zugeordnet werden.

Top Kommentator Code hat so viel über die Jahre geholfen. Hier ist es für Ruby 2.x aktualisiert und mit Gewinde Identifizierung verbessert. Wie ist das eine Verbesserung? Wenn jeder Thread eine ID hat, können Sie Threadpool mit einem Array zusammensetzen, die beliebige Informationen speichert. Einige Ideen:

  • Keine Array: typische Threadpool-Nutzung. Auch bei der GIL macht es tot leicht zu Code und sehr nützlich für hohe Latenz-Anwendungen wie High-Volume-Web-Crawling-Threading,
  • Threadpool und Array Größe Anzahl der CPUs: einfache Prozesse gabeln alle CPUs zu verwenden,
  • Threadpool und Array bemessen Anzahl von Ressourcen. B. jedes Array-Element stellt einen Prozessor über einen Pool von Fällen, also wenn Sie 10 Instanzen mit jeweils 4 CPUs kann die TP Arbeit über 40 Teilprozesse verwalten

Mit diesen letzten zwei, anstatt darüber nachzudenken, Threads zu tun denkt Arbeit über die Thread Verwaltung Teilprozesse, die die Arbeit tun. Die Managementaufgabe ist leicht und, wenn sie mit Unterprozessen kombiniert, wer kümmert sich um die GIL.

Mit dieser Klasse können Sie einen Cluster basierend MapReduce in etwa hundert Zeilen Code codieren up! Dieser Code ist schön kurz, obwohl es ein bisschen wie ein Geist-bend sein kann, um voll grok. Hoffe, es hilft.

# Usage:
#
#   Thread.abort_on_exception = true # help localize errors while debugging
#   pool = ThreadPool.new(thread_pool_size)
#   50.times {|i|
#     pool.process { ... }
#     or
#     pool.process {|id| ... } # worker identifies itself as id
#   }
#   pool.shutdown()

class ThreadPool

  require 'thread'

  class ThreadPoolWorker

    attr_accessor :id

    def initialize(thread_queue, id)
      @id = id # worker id is exposed thru tp.process {|id| ... }
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @idle_queue = thread_queue
      @running = true
      @block = nil
      @thread = Thread.new {
        @mutex.synchronize {
          while @running
            @cv.wait(@mutex) # block until there is work to do
            if @block
              @mutex.unlock
              begin
                @block.call(@id)
              ensure
                @mutex.lock
              end
              @block = nil
            end
            @idle_queue << self
          end
        }
      }
    end

    def set_block(block)
      @mutex.synchronize {
        raise RuntimeError, "Thread is busy." if @block
        @block = block
        @cv.signal # notify thread in this class, there is work to be done
      }
    end

    def busy?
      @mutex.synchronize { ! @block.nil? }
    end

    def stop
      @mutex.synchronize {
        @running = false
        @cv.signal
      }
      @thread.join
    end

    def name
      @thread.inspect
    end
  end


  attr_accessor :max_size, :queue

  def initialize(max_size = 10)
    @process_mutex = Mutex.new
    @max_size = max_size
    @queue = Queue.new # of idle workers
    @workers = []      # array to hold workers

    # construct workers
    @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }

    # queue up workers (workers in queue are idle and available to
    # work).  queue blocks if no workers are available.
    @max_size.times {|i| @queue << @workers[i] }

    sleep 1 # important to give threads a chance to initialize
  end

  def size
    @workers.size
  end

  def idle
    @queue.size
  end

  # are any threads idle

  def busy?
    # @queue.size < @workers.size
    @queue.size == 0 && @workers.size == @max_size
  end

  # block until all threads finish

  def shutdown
    @workers.each {|w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block = nil, &blk)
    @process_mutex.synchronize {
      block = blk if block_given?
      worker = @queue.pop # assign to next worker; block until one is ready
      worker.set_block(block) # give code block to worker and tell it to start
    }
  end


end
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top