Question

In a ruby application I have a bunch of tasks which share no state and I want to launch them off many at a time. Crucially, I don't care about the order they are started in, nor their return values (as they will each incur database transactions before they complete). I'm aware that depending on my ruby implementation the GIL may prevent these tasks from actually running at the same time, but that's OK because I'm not actually interested in true concurrency: these worker threads will be IO bound over network requests anyways.

What I've got so far is this:

def asyncDispatcher(numConcurrent, stateQueue, &workerBlock)
  workerThreads = []

  while not stateQueue.empty?
    while workerThreads.length < numConcurrent
      nextState = stateQueue.pop

      nextWorker =
        Thread.new(nextState) do |st|
          workerBlock.call(st)
        end

      workerThreads.push(nextWorker)
    end # inner while

    workerThreads.delete_if{|th| not th.alive?} # clean up dead threads
  end # outer while

  workerThreads.each{|th| th.join} # join any remaining workers
end # asyncDispatcher

And I invoke it like this:

asyncDispatcher(2, (1..10).to_a ) {|x| x + 1}

Are there any lurking bugs or concurrency pitfalls here? Or perhaps something in the runtime which would simplify this task?

Was it helpful?

Solution

Use a Queue:

require 'thread'

def asyncDispatcher(numWorkers, stateArray, &processor)
  q = Queue.new
  threads = []

  (1..numWorkers).each do |worker_id|
    threads << Thread.new(processor, worker_id) do |processor, worker_id|
      while true
        next_state = q.shift      #shift() blocks if q is empty, which is the case now
        break if next_state == q  #Some sentinel that won't appear in your data
        processor.call(next_state, worker_id)
      end
    end
  end

  stateArray.each {|state| q.push state}
  stateArray.each {q.push q}     #Some sentinel that won't appear in your data

  threads.each(&:join)
end


asyncDispatcher(2, (1..10).to_a) do |state, worker_id|
  time = sleep(Random.rand 10)  #How long it took to process state
  puts "#{state} is finished being processed: worker ##{worker_id} took #{time} secs."
end

--output:--
2 is finished being processed: worker #1 took 4 secs.
3 is finished being processed: worker #1 took 1 secs.
1 is finished being processed: worker #2 took 7 secs.
5 is finished being processed: worker #2 took 1 secs.
6 is finished being processed: worker #2 took 4 secs.
7 is finished being processed: worker #2 took 1 secs.
4 is finished being processed: worker #1 took 8 secs.
8 is finished being processed: worker #2 took 1 secs.
10 is finished being processed: worker #2 took 3 secs.
9 is finished being processed: worker #1 took 9 secs.

Okay, okay, someone is going look at that output and cry out,

Hey, #2 took a total of 13 seconds to do four jobs in a row, while #1 took only 8 secs. for a job, so #1's output for the 8 sec. job should have come earlier. There's no thread switching in Ruby! Ruby is broken!".

Well, while #1 was sleeping for its first two jobs for a total of 5 seconds, #2 was sleeping at the same time, so #2 only had 2 more seconds left to sleep when #1 finished it's first two jobs. So replace #2's 7 secs by 2 secs, and you'll see that after number #1 finished its first two jobs, #2 took a total of 8 seconds for its run of four jobs in a row, which tied #1 for it's 8 second job.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top