Vra

Ek kon nie 'n ordentlike ThreadPool implementering vir Ruby kry, so ek het myne (wat gebaseer is deels op-kode van hier af: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show . / 3276 , maar verander om te wag / sein en ander implementering vir ThreadPool afsluit maar na 'n rukkie van loop (met 100 drade en die hantering van ongeveer 1300 take), dit sterf met dooiepunt op lyn 25 - dit wag vir 'n nuwe werk daar. Enige idees, hoekom dit kan gebeur?

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end
Was dit nuttig?

Oplossing

Ok, so die grootste probleem met die implementering is: hoe om seker te maak geen sein verloor en vermy dooie slotte

?

In my ervaring, dit is werklik moeilik om te bereik met voorwaarde veranderlikes en Mutex, maar maklik met semafore. Dit gebeur so dat Ruby implementeer 'n voorwerp genoem Queue (of SizedQueue) dat die probleem op te los. Hier is my voorgestelde implementering:

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

En hier is 'n eenvoudige toets kode:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown

Ander wenke

Jy kan probeer om die work_queue juweel, wat ontwerp is om die werk te koördineer tussen 'n produsent en 'n poel van werker drade.

Ek is effens hier bevooroordeeld, maar ek sou raai modellering hierdie in 'n proses taal en model check it. Vrylik beskikbaar gereedskap is, byvoorbeeld, die mCRL2 toolset (met behulp van 'n ACP-gebaseerde taal), die mobiliteit Werksbank (pi-analise) en Spin (PROMELA).

Anders Ek sou raai die verwydering van elke stukkie kode wat nie noodsaaklik is om die probleem en vind 'n minimale geval waar die dooiepunt ontstaan. Ek twyfel dat dit die 100 drade en 1300 take is noodsaaklik om 'n dooiepunt kry. Met 'n kleiner geval jy kan waarskynlik net 'n bietjie debug afdrukke wat genoeg inligting die oplossing van die probleem te verskaf.

Ok, die probleem blyk te wees in jou ThreadPool # sein metode. Wat kan gebeur is:

1 - Al jou werker is besig en jy probeer om 'n nuwe werk te verwerk

2 - lyn 90 kry 'n nul werker

3 - 'n werker ontslae bevry en seine dit, maar die sein verloor as die ThreadPool nie wag vir dit

4 -. Jy val op lyn 95, wag, selfs al is daar 'n gratis werker

Die fout hier is dat jy 'n gratis werker selfs wanneer niemand luister kan aandui. Dit ThreadPool # sein metode moet wees:

def signal
     @mutex.synchronize { @cv.signal }
end

En die probleem is dieselfde in die Werker voorwerp. Wat kan gebeur is:

1 - Die werker pas 'n werk

2 - Dit tjeks (lyn 17) as daar 'n werk wag: daar is nie

3 - Die draad swembad stuur 'n nuwe werk en seine dit ... maar die sein verloor

4 - Die werker wag vir 'n sein, selfs al is dit is gemerk as besig

Jy moet jou inisialiseer metode sit as:

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

Volgende, die Werker # get_block en Werker # reset_block metodes moet nie gesinchroniseer word nie. Op dié manier kan jy nie 'n blok aan 'n werker tussen die toets vir 'n blok en die wag vir 'n sein.

Top opmerkings se kode het gehelp om uit soveel oor die jare. Hier word opgedateer vir Ruby 2.x en verbeter met draad identifikasie. Hoe is dit 'n verbetering? Wanneer elke draad het 'n ID, kan jy ThreadPool komponeer met 'n verskeidenheid wat arbitrêre inligting stoor. 'N paar idees:

  • Geen skikking: tipiese ThreadPool gebruik. Selfs met die GIL dit maak threading dood maklik om te kode en baie nuttig vir 'n hoë-latency programme soos hoë-volume web deur soek,
  • ThreadPool en Array grootte om verskeie CPUs: maklik om prosesse vurk om al CPUs gebruik,
  • ThreadPool en Array grootte om verskeie bronne:. Bv elke verskeidenheid element verteenwoordig een verwerker oor 'n poel van gevalle, so as jy 10 gevalle elk met 4 CPUs, kan die TP werk oor 40 subprosesse bestuur

Met hierdie laaste twee, eerder as om te dink oor gesprekke wat doen die werk te dink oor die ThreadPool besturende subprosesse wat doen die werk. Die bestuurstaak is liggewig en wanneer dit gekombineer met subprosesse, wat omgee vir die GIL.

Met hierdie klas, kan jy die kode op 'n cluster gebaseer MapReduce in oor 'n honderd lyne kode! Hierdie kode is pragtig kort hoewel dit kan 'n bietjie van 'n mind-buiging ten volle grok. Hoop dit help.

# Usage:
#
#   Thread.abort_on_exception = true # help localize errors while debugging
#   pool = ThreadPool.new(thread_pool_size)
#   50.times {|i|
#     pool.process { ... }
#     or
#     pool.process {|id| ... } # worker identifies itself as id
#   }
#   pool.shutdown()

class ThreadPool

  require 'thread'

  class ThreadPoolWorker

    attr_accessor :id

    def initialize(thread_queue, id)
      @id = id # worker id is exposed thru tp.process {|id| ... }
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @idle_queue = thread_queue
      @running = true
      @block = nil
      @thread = Thread.new {
        @mutex.synchronize {
          while @running
            @cv.wait(@mutex) # block until there is work to do
            if @block
              @mutex.unlock
              begin
                @block.call(@id)
              ensure
                @mutex.lock
              end
              @block = nil
            end
            @idle_queue << self
          end
        }
      }
    end

    def set_block(block)
      @mutex.synchronize {
        raise RuntimeError, "Thread is busy." if @block
        @block = block
        @cv.signal # notify thread in this class, there is work to be done
      }
    end

    def busy?
      @mutex.synchronize { ! @block.nil? }
    end

    def stop
      @mutex.synchronize {
        @running = false
        @cv.signal
      }
      @thread.join
    end

    def name
      @thread.inspect
    end
  end


  attr_accessor :max_size, :queue

  def initialize(max_size = 10)
    @process_mutex = Mutex.new
    @max_size = max_size
    @queue = Queue.new # of idle workers
    @workers = []      # array to hold workers

    # construct workers
    @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }

    # queue up workers (workers in queue are idle and available to
    # work).  queue blocks if no workers are available.
    @max_size.times {|i| @queue << @workers[i] }

    sleep 1 # important to give threads a chance to initialize
  end

  def size
    @workers.size
  end

  def idle
    @queue.size
  end

  # are any threads idle

  def busy?
    # @queue.size < @workers.size
    @queue.size == 0 && @workers.size == @max_size
  end

  # block until all threads finish

  def shutdown
    @workers.each {|w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block = nil, &blk)
    @process_mutex.synchronize {
      block = blk if block_given?
      worker = @queue.pop # assign to next worker; block until one is ready
      worker.set_block(block) # give code block to worker and tell it to start
    }
  end


end
Gelisensieer onder: CC-BY-SA met toeskrywing
Nie verbonde aan StackOverflow
scroll top