문제

을 찾을 수 없습니다 괜찮은 ThreadPool 구현하기 위한,그래서 내가 쓴 내(반 부분에 코드에서 여기: http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276 지만,변경이 기다릴/신호 및 다른 구현 ThreadPool 종료됩니다.그러나 몇 시간 후에 실행하는(는 100 개의 스레드 및 취급에 대해 1300 작업)의,그것은 사망으로 교착 상태에서 선 25-그것은 대기에 대한 새로운 정보가 없습니다.어떤 아이디어를,그것이 왜 일어날 수 있는가?

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end
도움이 되었습니까?

해결책

Ok,그래서 중요한 문제로 구현은:는 방법을 확인하는 신호가 없을 잃고 방지 죽은 자물쇠?

내 경험에 의하면,이것은 정말로 달성하기 위해 열심히 노력과 상태변수 및 mutex,그러나 쉽게 세마포와 함께.그것은 그래서 일어나는 루비를 구현하는 객체라는 큐(또는 SizedQueue)해야 하는 이 문제를 해결합니다.여기에 내 건의 구현:

require 'thread'
begin
  require 'fasttread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end

class ThreadPool
  class Worker
    def initialize(thread_queue)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @queue = thread_queue
      @running = true
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            @cv.wait(@mutex)
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
            end
            @queue << self
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @block
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @block = nil
    end

    def busy?
      @mutex.synchronize { !@block.nil? }
    end

    def stop
      @mutex.synchronize do
        @running = false
        @cv.signal
      end
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @queue = Queue.new
    @workers = []
  end

  def size
    @workers.size
  end

  def busy?
    @queue.size < @workers.size
  end

  def shutdown
    @workers.each { |w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    worker = get_worker
    worker.set_block(block)
  end

  private

  def get_worker
    if !@queue.empty? or @workers.size == @max_size
      return @queue.pop
    else
      worker = Worker.new(@queue)
      @workers << worker
      worker
    end
  end

end

그리고 여기에 간단한 테스트 코드:

tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown

다른 팁

을 시도할 수 있습니다 work_queue 보석,설계 작업을 조정 간에 생산자의 수영장 worker threads.

나는 약간 치우치지만,나는 것이 좋을 모델링 이 일부 프로세스에서 언어와 모델을 확인합니다.자유롭게 사용 가능한 도구는 다음과 같습니다,예를 들어,mCRL2 도구 세트를 사용하여(ACP 기반어),이동 작업(pi-미적분)및 스핀(PROMELA).

그렇지 않으면 좋을 제거하는 모든 코드의에 필수적인 문제를 찾는 최소한의 경우 교착 상태가 발생합니다.내가 심는 그것의 100 개의 스레드 및 1300 작업이 필수적인을 얻습니다.작은 경우를 추가하는 디버그 인쇄는 충분한 정보를 제공하면 문제를 해결합니다.

확인,문제의 것에 ThreadPool#신호 방법입니다.What 발생할 수 있습니다:

1-모든 작업자들이 바쁜 당신이 시도하는 과정을 새로운 작업

2-라인 90 얻 nil 자

3-작업자는 해제하고 신호로 그것은,그러나 신호의 손실로 ThreadPool 이 기다리지 않고 그것을 위해

4-에서 떨어지는 라인 95 을 기다리고,있어도 무료로 노동자입니다.

에 오류가 여기에는 신호할 수 있는 무료는 작업자람이 없을 때에도 의견을 듣는 것입니다.이 ThreadPool#신호 방법을 이어야 한다:

def signal
     @mutex.synchronize { @cv.signal }
end

그리고 문제에서도업체입니다.어떤 일이 일어날 수 있다:

1-작업자는 단지 완료된 작업

2-It 검사(line17)가 있으면 작업을 기다리고 있습니다:이 없

3-스레드 풀을 보낼 새로운 일자리와 신호 그것은...하지만 이 신호 분실

4-작업자 신호를 기다립니다,비록 그것이 바쁜으로 표시

당신의 초기화 방법:

def initialize(callback)
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @callback = callback
  @mutex.synchronize {@running = true}
  @thread = Thread.new do
    @mutex.synchronize do
      while @running
        block = get_block
        if block
          @mutex.unlock
          block.call
          @mutex.lock
          reset_block
          # Signal the ThreadPool that this worker is ready for another job
          @callback.signal
        else
          # Wait for a new job
          @cv.wait(@mutex)
        end
      end
    end
  end
end

다음 Worker#get_block 및 작업자#reset_block 방법을 동기화되지 않습니다 더 이상.그런 식으로,당신은 있을 수 없 블록 할당된 노동자 사이의 테스트에 대한 차단 및 기다리는 신호입니다.

최 주석 코드를 도왔다 너무 많습니다.여기에는 루비에 대해 업데이트 2.x 과 향상된 스레드 id.어떻게 개선?각 스레드 ID,을 작성할 수 있습 ThreadPool 배열로 저장하는 임의의 정보입니다.몇 가지 아이디어:

  • 아 배열:일반적인 ThreadPool 사용합니다.도 GIL 그것은 스레딩 죽은 쉽게 코드와 매우 유용한 지연이 응용 프로그램과 같은 높은 볼륨 웹 크롤링
  • ThreadPool 및 배열의 크기를 Cpu:을 쉽게 포크의 프로세스를 사용하여 모든 Cpu
  • ThreadPool 및 배열의 크기를 자원 수:예를 들어,각각 배열 요소 중 하나를 나타냅 프로세서에서는 수영장 등의 인스턴스,그래서 만약 당신이 10 인스턴스를 각각 4 개의 Cpu 가,TP 관리할 수 있는 작업에 걸쳐 40 하위.

이러한 마지막 두 아닌에 대해 생각하고 스레드 작업에 대해 생각 ThreadPool 관리 서브프는 작업을하고있다.관리 작업입니다 가볍고와 결합하면 하위 사람에 대한 관심도 만나볼 수 있습니다.

이 클래스 코드를 작성할 수 있습니다 클러스터를 기반으로 MapReduce 에 대한 내용이 라인의 코드!이 코드는 아름답게 짧은 될 수 있지만 조금의 마음을 굽히 grok.도움이 되기를 바랍니다.

# Usage:
#
#   Thread.abort_on_exception = true # help localize errors while debugging
#   pool = ThreadPool.new(thread_pool_size)
#   50.times {|i|
#     pool.process { ... }
#     or
#     pool.process {|id| ... } # worker identifies itself as id
#   }
#   pool.shutdown()

class ThreadPool

  require 'thread'

  class ThreadPoolWorker

    attr_accessor :id

    def initialize(thread_queue, id)
      @id = id # worker id is exposed thru tp.process {|id| ... }
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @idle_queue = thread_queue
      @running = true
      @block = nil
      @thread = Thread.new {
        @mutex.synchronize {
          while @running
            @cv.wait(@mutex) # block until there is work to do
            if @block
              @mutex.unlock
              begin
                @block.call(@id)
              ensure
                @mutex.lock
              end
              @block = nil
            end
            @idle_queue << self
          end
        }
      }
    end

    def set_block(block)
      @mutex.synchronize {
        raise RuntimeError, "Thread is busy." if @block
        @block = block
        @cv.signal # notify thread in this class, there is work to be done
      }
    end

    def busy?
      @mutex.synchronize { ! @block.nil? }
    end

    def stop
      @mutex.synchronize {
        @running = false
        @cv.signal
      }
      @thread.join
    end

    def name
      @thread.inspect
    end
  end


  attr_accessor :max_size, :queue

  def initialize(max_size = 10)
    @process_mutex = Mutex.new
    @max_size = max_size
    @queue = Queue.new # of idle workers
    @workers = []      # array to hold workers

    # construct workers
    @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }

    # queue up workers (workers in queue are idle and available to
    # work).  queue blocks if no workers are available.
    @max_size.times {|i| @queue << @workers[i] }

    sleep 1 # important to give threads a chance to initialize
  end

  def size
    @workers.size
  end

  def idle
    @queue.size
  end

  # are any threads idle

  def busy?
    # @queue.size < @workers.size
    @queue.size == 0 && @workers.size == @max_size
  end

  # block until all threads finish

  def shutdown
    @workers.each {|w| w.stop }
    @workers = []
  end

  alias :join :shutdown

  def process(block = nil, &blk)
    @process_mutex.synchronize {
      block = blk if block_given?
      worker = @queue.pop # assign to next worker; block until one is ready
      worker.set_block(block) # give code block to worker and tell it to start
    }
  end


end
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top