Pregunta

No pude encontrar una implementación ThreadPool decente para Ruby, así que escribí la mía (basada en parte en el código de aquí: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276 , pero se cambió a esperar/señalar y otra implementación para el cierre de ThreadPool.Sin embargo, después de un tiempo de ejecución (con 100 subprocesos y manejando alrededor de 1300 tareas), muere con un punto muerto en la línea 25: espera un nuevo trabajo allí.¿Alguna idea de por qué podría suceder?

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end
¿Fue útil?

Solución

Bien, entonces el principal problema con la implementación es:¿Cómo asegurarse de que no se pierda ninguna señal y evitar bloqueos?

En mi experiencia, esto es REALMENTE difícil de lograr con variables de condición y mutex, pero fácil con semáforos.Sucede que Ruby implementa un objeto llamado Queue (o SizedQueue) que debería resolver el problema.Aquí está mi implementación sugerida:

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

Y aquí hay un código de prueba simple:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown

Otros consejos

Puedes probar el cola_trabajo gema, diseñada para coordinar el trabajo entre un productor y un grupo de subprocesos de trabajo.

Soy un poco parcial aquí, pero sugeriría modelar esto en algún lenguaje de proceso y verificarlo.Las herramientas disponibles gratuitamente son, por ejemplo, el conjunto de herramientas mCRL2 (que utiliza un lenguaje basado en ACP), Mobility Workbench (pi-calculus) y Spin (PROMELA).

De lo contrario, sugeriría eliminar todo el código que no sea esencial para el problema y encontrar un caso mínimo en el que se produzca el punto muerto.Dudo que los 100 subprocesos y las 1300 tareas sean esenciales para llegar a un punto muerto.Con un caso más pequeño, probablemente pueda agregar algunas impresiones de depuración que proporcionen suficiente información para resolver el problema.

Ok, el problema parece estar en tu método ThreadPool#signal.Lo que puede pasar es:

1 - Todos tus trabajadores están ocupados e intentas procesar un nuevo trabajo.

2 - la línea 90 obtiene un trabajador nulo

3: un trabajador se libera y lo señala, pero la señal se pierde porque ThreadPool no la está esperando.

4 - caes en la línea 95, esperando aunque haya un trabajador libre.

El error aquí es que puedes señalar a un trabajador libre incluso cuando nadie está escuchando.Este método ThreadPool#signal debería ser:

def signal
     @mutex.synchronize { @cv.signal }
end

Y el problema es el mismo en el objeto Trabajador.Lo que podría pasar es:

1 - El trabajador acaba de completar un trabajo

2 - Comprueba (línea 17) si hay un trabajo en espera:no hay

3 - El grupo de subprocesos envía un nuevo trabajo y lo señala...pero se pierde la señal

4 - El trabajador espera señal, aunque esté marcado como ocupado

Deberías poner tu método de inicialización como:

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

A continuación, los métodos Worker#get_block y Worker#reset_block ya no deberían sincronizarse.De esa manera, no puede asignar un bloque a un trabajador entre la prueba de un bloque y la espera de una señal.

El código de los principales comentaristas ha sido de gran ayuda a lo largo de los años.Aquí está actualizado para Ruby 2.x y mejorado con identificación de subprocesos.¿Cómo es eso una mejora?Cuando cada hilo tiene una ID, puede componer ThreadPool con una matriz que almacena información arbitraria.Algunas ideas:

  • Sin matriz:Uso típico de ThreadPool.Incluso con GIL, hace que los subprocesos sean muy fáciles de codificar y muy útiles para aplicaciones de alta latencia como el rastreo web de gran volumen.
  • ThreadPool y Array dimensionados según el número de CPU:procesos fáciles de bifurcar para usar todas las CPU,
  • ThreadPool y Array dimensionados según la cantidad de recursos:por ejemplo, cada elemento de la matriz representa un procesador en un grupo de instancias, por lo que si tiene 10 instancias cada una con 4 CPU, el TP puede administrar el trabajo en 40 subprocesos.

Con estos dos últimos, en lugar de pensar en los subprocesos que realizan el trabajo, piense en ThreadPool que administra los subprocesos que están realizando el trabajo.La tarea de gestión es liviana y cuando se combina con subprocesos, a quién le importa el GIL.

¡Con esta clase, puede codificar un MapReduce basado en clúster en aproximadamente cien líneas de código!Este código es maravillosamente breve, aunque puede resultar un poco complicado asimilarlo por completo.Espero eso ayude.

# Usage:
#
#   Thread.abort_on_exception = true # help localize errors while debugging
#   pool = ThreadPool.new(thread_pool_size)
#   50.times {|i|
#     pool.process { ... }
#     or
#     pool.process {|id| ... } # worker identifies itself as id
#   }
#   pool.shutdown()

class ThreadPool

  require 'thread'

  class ThreadPoolWorker

    attr_accessor :id

    def initialize(thread_queue, id)
      @id = id # worker id is exposed thru tp.process {|id| ... }
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @idle_queue = thread_queue
      @running = true
      @block = nil
      @thread = Thread.new {
        @mutex.synchronize {
          while @running
            @cv.wait(@mutex) # block until there is work to do
            if @block
              @mutex.unlock
              begin
                @block.call(@id)
              ensure
                @mutex.lock
              end
              @block = nil
            end
            @idle_queue << self
          end
        }
      }
    end

    def set_block(block)
      @mutex.synchronize {
        raise RuntimeError, "Thread is busy." if @block
        @block = block
        @cv.signal # notify thread in this class, there is work to be done
      }
    end

    def busy?
      @mutex.synchronize { ! @block.nil? }
    end

    def stop
      @mutex.synchronize {
        @running = false
        @cv.signal
      }
      @thread.join
    end

    def name
      @thread.inspect
    end
  end


  attr_accessor :max_size, :queue

  def initialize(max_size = 10)
    @process_mutex = Mutex.new
    @max_size = max_size
    @queue = Queue.new # of idle workers
    @workers = []      # array to hold workers

    # construct workers
    @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }

    # queue up workers (workers in queue are idle and available to
    # work).  queue blocks if no workers are available.
    @max_size.times {|i| @queue << @workers[i] }

    sleep 1 # important to give threads a chance to initialize
  end

  def size
    @workers.size
  end

  def idle
    @queue.size
  end

  # are any threads idle

  def busy?
    # @queue.size < @workers.size
    @queue.size == 0 && @workers.size == @max_size
  end

  # block until all threads finish

  def shutdown
    @workers.each {|w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block = nil, &blk)
    @process_mutex.synchronize {
      block = blk if block_given?
      worker = @queue.pop # assign to next worker; block until one is ready
      worker.set_block(block) # give code block to worker and tell it to start
    }
  end


end
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top