質問

が見出せなかったのであThreadPool実施のためのRubyで書いた鉱山(一部コードからもアクセスできます。 http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276 が変わるのを待/信号およびその他の実施のためのThreadPool閉鎖される事態となりました。しかしその後の走行(100スレッドの取り扱いについて1300タスク)で金型の行き詰まりの線で25でお待ちしておりますので新しい仕事あります。そのアイデア、そんな失業が生まれるのでしょうか。

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end
役に立ちましたか?

解決

Okなので、主な問題の実施には:どのような信号が消失を回避する死ロック?

私の経験とかのリクとかその他もろもろは難しその達成状態変数およびミューテックスが容易セマフォ.折からrubyの実施と呼ばれるオブジェクトキュー(またはSizedQueueるべき問題ない。ここでは私の提案の実施

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

これからも、簡単なテストコード:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown

他のヒント

体験してみましょう work_queue 逸品は、コーディネートとプロデューサー、プールの労働者のスレッド)。

私はやや偏ったが私はあなたがこのモデル化の一部処理言語モデルのチェックします。自由に利用可能なツールは、例えば、mCRL2のパワフルなツールセットを使用してACPに基づく言語)に移動ワークベンチ(pi-微積分)とスピン(PROMELA).

場を提供している除去のプロジェクトをコードするには欠かせない問題を発見し、最小限の場合は、行き詰まりが発生します。うるものか疑問では100スレッド1300業務に必要な取得が難航している。小さな場合にはできるのではないでしょうかだけで追加のデバッグ版画を提供する十分な情報に問題ない。

本当は、問題になっているようごThreadPool#信号方法。起こることが

1-全ての労働者が忙しいという過程で新しいジョブ

2ラインの90がnil労働者

3-a労働く解放され、信号はそれなりですが、信号を紛失したとしてThreadPoolではないのを待ってい

4つまでの線95待ちもあります。

のエラーはこちらできる信号http応答ヘッダのみが書き換えられる場合でも誰もができます。このThreadPool#信号のメソッドは:

def signal
     @mutex.synchronize { @cv.signal }
end

その問題点と同じ労働者のオブジェクトです。これは:

1-労働者だけでジョブ完了

2-かどうかをチェックし(17)がある場合について:はありません

3つのスレッドプールを送信した場合に、新しい仕事の信号で...その信号を紛失した

4つの労働者の待ち、信号にもすることができるものとして忙しい

ございますのでご注意下さいごの初期化方法:

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

次に、労働者#get_blockと労働者#reset_blockメソッドは同期化されないことがあります。るというようにできない場合もありますのでブロックの割当て作業者との間で試験のためのブロックを待っています。

トップコメント投稿者のコードも行っています。こちらではの更新のためのruby2.xの改善とスレッドを同定するたさあどうだろうか。それぞれのスレッドIDで構成ThreadPoolと配列が格納されている任意の情報です。一部のアイデア:

  • まいません:代表的なThreadPoolます。でも、ギルでスレッドで死んやすいコードは、非常に有のための高レイテンシーのアプリケーションの大容量のウェブクローリング
  • ThreadPoolおよび配列の小数Cpu:やすいフォークプロセスをご利用の場合、Cpu
  • ThreadPoolおよび配列の小数資源例えば、配列要素を表すプロセッサー全体のプールの場合、そのまま10インスタンスそれぞれ4のCpuは、TP管理活動にも40のサブプロセス.

これらの最後の二つのでなく、考えるスレッドの仕事について考えThreadPoolサブプロセスの管理を行っている。の管理タスクは軽量化との組み合わせにサブプロセス、利益ばかり考えて行動している吉.

このクラスは、コーディングでき、クラスタリングに基づくMapReduce約数行のコード!このコードが美しく短いものでスタッフのお母さんの心-曲がりを完全にgrok.希望です。

# Usage:
#
#   Thread.abort_on_exception = true # help localize errors while debugging
#   pool = ThreadPool.new(thread_pool_size)
#   50.times {|i|
#     pool.process { ... }
#     or
#     pool.process {|id| ... } # worker identifies itself as id
#   }
#   pool.shutdown()

class ThreadPool

  require 'thread'

  class ThreadPoolWorker

    attr_accessor :id

    def initialize(thread_queue, id)
      @id = id # worker id is exposed thru tp.process {|id| ... }
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @idle_queue = thread_queue
      @running = true
      @block = nil
      @thread = Thread.new {
        @mutex.synchronize {
          while @running
            @cv.wait(@mutex) # block until there is work to do
            if @block
              @mutex.unlock
              begin
                @block.call(@id)
              ensure
                @mutex.lock
              end
              @block = nil
            end
            @idle_queue << self
          end
        }
      }
    end

    def set_block(block)
      @mutex.synchronize {
        raise RuntimeError, "Thread is busy." if @block
        @block = block
        @cv.signal # notify thread in this class, there is work to be done
      }
    end

    def busy?
      @mutex.synchronize { ! @block.nil? }
    end

    def stop
      @mutex.synchronize {
        @running = false
        @cv.signal
      }
      @thread.join
    end

    def name
      @thread.inspect
    end
  end


  attr_accessor :max_size, :queue

  def initialize(max_size = 10)
    @process_mutex = Mutex.new
    @max_size = max_size
    @queue = Queue.new # of idle workers
    @workers = []      # array to hold workers

    # construct workers
    @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }

    # queue up workers (workers in queue are idle and available to
    # work).  queue blocks if no workers are available.
    @max_size.times {|i| @queue << @workers[i] }

    sleep 1 # important to give threads a chance to initialize
  end

  def size
    @workers.size
  end

  def idle
    @queue.size
  end

  # are any threads idle

  def busy?
    # @queue.size < @workers.size
    @queue.size == 0 && @workers.size == @max_size
  end

  # block until all threads finish

  def shutdown
    @workers.each {|w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block = nil, &blk)
    @process_mutex.synchronize {
      block = blk if block_given?
      worker = @queue.pop # assign to next worker; block until one is ready
      worker.set_block(block) # give code block to worker and tell it to start
    }
  end


end
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top