Question

Je ne pouvais pas trouver une implémentation ThreadPool décente pour Ruby, alors j’ai écrit la mienne (basée en partie sur du code d’ici: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show / 3276 , mais modifié en attente / signal et autre implémentation pour l’arrêt de ThreadPool. Toutefois, après un certain temps d’exécution (100 threads traités et environ 1 300 tâches), il meurt avec blocage dans la ligne 25 - attend une nouvelle Il y a des idées, pourquoi cela pourrait-il arriver?

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end
Était-ce utile?

La solution

Ok, le principal problème de cette implémentation est le suivant: comment s’assurer qu’aucun signal n’est perdu et éviter les blocages??

Selon mon expérience, c’est VRAIMENT difficile à obtenir avec les variables de condition et le mutex, mais facile avec les sémaphores. Il se trouve que ruby ??implémente un objet appelé Queue (ou SizedQueue) qui devrait résoudre le problème. Voici ma suggestion d'implémentation:

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

Et voici un code de test simple:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown

Autres conseils

Vous pouvez essayer le work_queue , conçu pour coordonner le travail entre un producteur et un pool de travailleurs. discussions.

Je suis légèrement biaisé ici, mais je suggérerais de modéliser ceci dans un langage de processus et de le vérifier par modèle. Les outils disponibles gratuitement sont, par exemple, le jeu d’outils mCRL2 (utilisant un langage basé sur ACP), Mobility Workbench (pi-calcul) et Spin (PROMELA).

Sinon, je suggérerais de supprimer tous les éléments de code qui ne sont pas essentiels au problème et de rechercher un cas minimal où l'impasse se produirait. Je doute que les 100 threads et les 1300 tâches soient essentiels pour obtenir une impasse. Avec un cas plus petit, vous pouvez probablement simplement ajouter des impressions de débogage fournissant suffisamment d’informations pour résoudre le problème.

Ok, le problème semble être lié à votre méthode de signal ThreadPool #. Ce qui peut arriver est:

1 - Tous vos employés sont occupés et vous essayez de traiter un nouvel emploi

2 - la ligne 90 obtient un ouvrier nul

3 - un opérateur est libéré et le signale, mais le signal est perdu car le ThreadPool ne l'attend pas

4 - vous tombez sur la ligne 95 et attendez même s'il y a un travailleur libre.

L’erreur ici est que vous pouvez signaler un ouvrier libre même lorsque personne n’écoute. Cette méthode de signal ThreadPool # devrait être:

def signal
     @mutex.synchronize { @cv.signal }
end

Et le problème est le même dans l'objet Worker. Ce qui pourrait arriver est:

1 - Le travailleur vient de terminer un travail

2 - Il vérifie (ligne 17) s'il y a un travail en attente: il n'y en a pas

3 - Le pool de threads envoie un nouveau travail et le signale ... mais le signal est perdu

4 - Le travailleur attend un signal, même s'il est marqué comme occupé

Vous devez définir votre méthode d'initialisation comme suit:

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

Ensuite, les méthodes Worker # get_block et Worker # reset_block ne doivent plus être synchronisées. De cette façon, vous ne pouvez pas affecter de bloc à un opérateur entre le test de bloc et l’attente d’un signal.

Le code du meilleur commentateur a beaucoup aidé au fil des ans. Ici, il est mis à jour pour ruby ??2.x et amélioré avec l'identification des threads. Comment est-ce une amélioration? Lorsque chaque thread a un identifiant, vous pouvez composer ThreadPool avec un tableau stockant des informations arbitraires. Quelques idées:

  • Pas de tableau: utilisation typique de ThreadPool. Même avec le GIL, le threading est facile à coder et très utile pour les applications à latence élevée telles que l’analyse Web à volume élevé,
  • ThreadPool et Array dimensionnés en fonction du nombre de processeurs: processus faciles à transformer pour utiliser tous les processeurs,
  • ThreadPool et Array dimensionnés en nombre de ressources: chaque élément de tableau, par exemple, représente un processeur dans un pool d'instances. Par conséquent, si vous disposez de 10 instances avec 4 processeurs, le TP peut gérer le travail en 40 sous-processus.

Avec ces deux derniers, plutôt que de penser aux threads effectuant un travail, pensez au ThreadPool qui gère les sous-processus qui effectuent le travail. La tâche de gestion est légère et combinée à des sous-processus, qui se soucie de la GIL.

Avec cette classe, vous pouvez coder MapReduce en cluster en une centaine de lignes de code! Ce code est magnifiquement court, bien que cela puisse être un peu compliqué d’être complètement réussi. J'espère que ça aide.

# Usage:
#
#   Thread.abort_on_exception = true # help localize errors while debugging
#   pool = ThreadPool.new(thread_pool_size)
#   50.times {|i|
#     pool.process { ... }
#     or
#     pool.process {|id| ... } # worker identifies itself as id
#   }
#   pool.shutdown()

class ThreadPool

  require 'thread'

  class ThreadPoolWorker

    attr_accessor :id

    def initialize(thread_queue, id)
      @id = id # worker id is exposed thru tp.process {|id| ... }
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @idle_queue = thread_queue
      @running = true
      @block = nil
      @thread = Thread.new {
        @mutex.synchronize {
          while @running
            @cv.wait(@mutex) # block until there is work to do
            if @block
              @mutex.unlock
              begin
                @block.call(@id)
              ensure
                @mutex.lock
              end
              @block = nil
            end
            @idle_queue << self
          end
        }
      }
    end

    def set_block(block)
      @mutex.synchronize {
        raise RuntimeError, "Thread is busy." if @block
        @block = block
        @cv.signal # notify thread in this class, there is work to be done
      }
    end

    def busy?
      @mutex.synchronize { ! @block.nil? }
    end

    def stop
      @mutex.synchronize {
        @running = false
        @cv.signal
      }
      @thread.join
    end

    def name
      @thread.inspect
    end
  end


  attr_accessor :max_size, :queue

  def initialize(max_size = 10)
    @process_mutex = Mutex.new
    @max_size = max_size
    @queue = Queue.new # of idle workers
    @workers = []      # array to hold workers

    # construct workers
    @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }

    # queue up workers (workers in queue are idle and available to
    # work).  queue blocks if no workers are available.
    @max_size.times {|i| @queue << @workers[i] }

    sleep 1 # important to give threads a chance to initialize
  end

  def size
    @workers.size
  end

  def idle
    @queue.size
  end

  # are any threads idle

  def busy?
    # @queue.size < @workers.size
    @queue.size == 0 && @workers.size == @max_size
  end

  # block until all threads finish

  def shutdown
    @workers.each {|w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block = nil, &blk)
    @process_mutex.synchronize {
      block = blk if block_given?
      worker = @queue.pop # assign to next worker; block until one is ready
      worker.set_block(block) # give code block to worker and tell it to start
    }
  end


end
Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top