Question

I have an immutable Iterable<X> with a large number of elements. (it happens to be a List<> but never mind that.)

What I would like to do is start a few parallel / asynchronous tasks to iterate over the Iterable<> with the same iterator, and I'm wondering what interface I should use.

Here's a sample implementation with the to-be-determined interface QuasiIteratorInterface:

public void process(Iterable<X> iterable)
{
   QuasiIteratorInterface<X> qit = ParallelIteratorWrapper.iterate(iterable);
   for (int i = 0; i < MAX_PARALLEL_COUNT; ++i)
   {
      SomeWorkerClass worker = new SomeWorkerClass(qit);
      worker.start();
   }
}

class ParallelIteratorWrapper<T> implements QuasiIteratorInterface<T>
{
   final private Iterator<T> iterator;
   final private Object lock = new Object();
   private ParallelIteratorWrapper(Iterator<T> iterator) { 
      this.iterator = iterator;
   }
   static public <T> ParallelIteratorWrapper<T> iterate(Iterable<T> iterable)
   {
      return new ParallelIteratorWrapper(iterable.iterator());
   }
   private T getNextItem()
   {
      synchronized(lock)
      {
         if (this.iterator.hasNext())
            return this.iterator.next();
         else
            return null;
      }
   }
   /* QuasiIteratorInterface methods here */
}

Here's my problem:

  • it doesn't make sense to use Iterator directly, since hasNext() and next() have a synchronization problem, where hasNext() is useless if someone else calls next() before you do.

  • I'd love to use Queue, but the only method I need is poll()

  • I'd love to use ConcurrentLinkedQueue to hold my large number of elements... except I may have to iterate through the elements more than once, so I can't use that.

Any suggestions?

Was it helpful?

Solution

Create your own Producer interface with the poll() method or equivalent (Guava's Supplier for instance). The implementation options are many but if you have an immutable random access list then you can simply maintain a thread-safe monotonic counter (AtomicInteger for instance) and call list.get(int) eg:

class ListSupplier<T> implements Supplier<T> {
  private final AtomicInteger next = new AtomicInteger();
  private final List<T> elements; // ctor injected

  …
  public <T> get() {
    // real impl more complicated due to bounds checks
    // and what to do when exhausted
    return elements.get(next.getAndIncrement());
  }
}

That is thread-safe, but you'd probably want to either return an Option style thing or null when exhausted.

OTHER TIPS

Have one dispatcher thread that iterates over Iterable and dispatches elements to multiple worker threads that perform the work on the elements. You can use ThreadPoolExecutor to automate this.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top