Use a Queue:
require 'thread'
def asyncDispatcher(numWorkers, stateArray, &processor)
q = Queue.new
threads = []
(1..numWorkers).each do |worker_id|
threads << Thread.new(processor, worker_id) do |processor, worker_id|
while true
next_state = q.shift #shift() blocks if q is empty, which is the case now
break if next_state == q #Some sentinel that won't appear in your data
processor.call(next_state, worker_id)
end
end
end
stateArray.each {|state| q.push state}
stateArray.each {q.push q} #Some sentinel that won't appear in your data
threads.each(&:join)
end
asyncDispatcher(2, (1..10).to_a) do |state, worker_id|
time = sleep(Random.rand 10) #How long it took to process state
puts "#{state} is finished being processed: worker ##{worker_id} took #{time} secs."
end
--output:--
2 is finished being processed: worker #1 took 4 secs.
3 is finished being processed: worker #1 took 1 secs.
1 is finished being processed: worker #2 took 7 secs.
5 is finished being processed: worker #2 took 1 secs.
6 is finished being processed: worker #2 took 4 secs.
7 is finished being processed: worker #2 took 1 secs.
4 is finished being processed: worker #1 took 8 secs.
8 is finished being processed: worker #2 took 1 secs.
10 is finished being processed: worker #2 took 3 secs.
9 is finished being processed: worker #1 took 9 secs.
Okay, okay, someone is going look at that output and cry out,
Hey, #2 took a total of 13 seconds to do four jobs in a row, while #1 took only 8 secs. for a job, so #1's output for the 8 sec. job should have come earlier. There's no thread switching in Ruby! Ruby is broken!".
Well, while #1 was sleeping for its first two jobs for a total of 5 seconds, #2 was sleeping at the same time, so #2 only had 2 more seconds left to sleep when #1 finished it's first two jobs. So replace #2's 7 secs by 2 secs, and you'll see that after number #1 finished its first two jobs, #2 took a total of 8 seconds for its run of four jobs in a row, which tied #1 for it's 8 second job.