Question

I have an object based on array, which implements the following interface:

public interface PairSupplier<Q, E> {
     public int size();

     public Pair<Q, E> get(int index);
}

I would like to create a specific iterator over it:

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     //some magic
}

In method next I would like to return some element from PairSupplier.

This element should be unique for thread, other threads should not have this element.

Since PairSupplier has a final size, this situation is not always possible, but I would like to approach it.

The order of elements doesn't matter, thread can take same element at a different time.

Example: 2 Threads, 5 elements - {1,2,3,4,5}

Thread 1  | Thread 2
   1           2
   3           4
   5           1
   3           2
   4           5

My solution:

I create AtomicInteger index, which I increment on every next call.

PairSupplier pairs;
AtomicInteger index;

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     int position = index.incrementAndGet() % pairs.size;
     if (position < 0) {
          position *= -1;
          position = pairs.size - position;
     }
     return pairs.get(position);
}

pairs and index are shared among all threads.

I found this solution not scalable (because all threads go for increment), maybe someone have better ideas?

This iterator will be used by 50-1000 threads.

Was it helpful?

Solution

Your question details are ambiguous - your example suggests that two threads can be handed the same Pair but you say otherwise in the description.

As the more difficult to achieve, I will offer an Iterable<Pair<Q,E>> that will deliver Pairs one per thread until the supplier cycles - then it will repeat.

public interface Supplier<T> {
  public int size();

  public T get(int index);

}

public interface PairSupplier<Q, E> extends Supplier<Pair<Q, E>> {
}

public class IterableSupplier<T> implements Iterable<T> {
  // The common supplier to use across all threads.
  final Supplier<T> supplier;
  // The atomic counter.
  final AtomicInteger i = new AtomicInteger();

  public IterableSupplier(Supplier<T> supplier) {
    this.supplier = supplier;
  }

  @Override
  public Iterator<T> iterator() {
    /**
     * You may create a NEW iterator for each thread while they all share supplier
     * and Will therefore distribute each Pair between different threads.
     *
     * You may also share the same iterator across multiple threads.
     *
     * No two threads will get the same pair twice unless the sequence cycles.
     */
    return new ThreadSafeIterator();
  }

  private class ThreadSafeIterator implements Iterator<T> {
    @Override
    public boolean hasNext() {
      /**
       * Always true.
       */
      return true;
    }

    private int pickNext() {
      // Just grab one atomically.
      int pick = i.incrementAndGet();
      // Reset to zero if it has exceeded - but no spin, let "just someone" manage it.
      int actual = pick % supplier.size();
      if (pick != actual) {
        // So long as someone has a success before we overflow int we're good.
        i.compareAndSet(pick, actual);
      }
      return actual;
    }

    @Override
    public T next() {
      return supplier.get(pickNext());
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException("Remove not supported.");
    }

  }

}

NB: I have adjusted the code a little to accommodate both scenarios. You can take an Iterator per thread or share a single Iterator across threads.

OTHER TIPS

You have a piece of information ("has anyone taken this Pair already?") that must be shared between all threads. So for the general case, you're stuck. However, if you have an idea about this size of your array and the number of threads, you could use buckets to make it less painful.

Let's suppose we know that there will be 1,000,000 array elements and 1,000 threads. Assign each thread a range (thread #1 gets elements 0-999, etc). Now instead of 1,000 threads contending for one AtomicInteger, you can have no contention at all!

That works if you can be sure that all your threads will run at about the same pace. If you need to handle the case where sometimes thread #1 is busy doing other things while thread #2 is idle, you can modify your bucket pattern slightly: each bucket has an AtomicInteger. Now threads will generally only contend with themselves, but if their bucket is empty, they can move on to the next bucket.

I'm having some trouble understanding what the problem you are trying to solve is?

Does each thread process the whole collection?

Is the concern that no two threads can work on the same Pair at the same time? But each thread needs to process each Pair in the collection?

Or do you want the collection processed once by using all of the threads?

There is one key thing which is obscure in your example - what exactly is the meaning this?

The order of elements doesn't matter, thread can take same element at a different time.

"different time" means what? Within N milliseconds of each other? Does it mean that absolutely two threads will never be touching the same Pair at the same time? I will assume that.

If you want to decrease the probability that threads will block on each other contending for the same Pair, and there is a backing array of Pairs, try this:

  • Partition your array into numPairs / threadCount sub-arrays (you don't have to actually create sub-arrays, just start at different offsets - but it's easier to think about as sub-array)
  • Assign each thread to a different sub-array; when a thread exhausts its sub-array, increment the index of its sub array
    • Say we have 6 Pairs and 2 threads - your assignments look like Thread-1:[0,1,2] Thread-2:[3,4,5]. When Thread-1 starts it will be looking at a different set of Pairs than thread 2, so it is unlikely that they will contend for the same pair
  • If it is important that two threads really not touch a Pair at the same time, then wrap all of the code which touches a Pair object in synchronized(pair) (synchronize on the instance, not the type!) - there may occasionally be blocking, but you're never blocking all threads on a single thing, as with the AtomicInteger - threads can only block each other because they are really trying to touch the same object

Note this is not guaranteed never to block - for that, all threads would have to run at exactly the same speed, and processing every Pair object would have to take exactly the same amount of time, and the OS's thread scheduler would have to never steal time from one thread but not another. You cannot assume any of those things. What this gives you is a higher probability that you will get better concurrency, by dividing the areas to work in and making the smallest unit of state that is shared be the lock.

But this is the usual pattern for getting more concurrency on a data structure - partition the data between threads so that they rarely are touching the same lock at the same time.

The most easy that o see, is create Hash set or Map, and give a unique hash for every thread. After that just do simple get by this hash code.

This is standard java semaphore usage problem. The following javadoc gives almost similar example as your problem. http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html

If you need more help, let me know?

I prefer a lock and release process.

If a thread is asking for a pair object, the Pair object is removed from the supplier. Before the thread is asking for a new pair, the 'old' pair is added the the suplier again.

You can push from front and put at the end.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top