java: itération simultanée sur un Iterable immuable
-
29-10-2019 - |
Question
J'ai un Iterable<X>
immuable avec un grand nombre d'éléments. (il se trouve que c'est un List<>
, mais tant pis.)
Ce que je voudrais faire, c'est démarrer quelques tâches parallèles / asynchrones pour itérer sur le Iterable<>
avec le même itérateur , et Je me demande quelle interface je devrais utiliser .
Voici un exemple d'implémentation avec l'interface à déterminer QuasiIteratorInterface
:
public void process(Iterable<X> iterable)
{
QuasiIteratorInterface<X> qit = ParallelIteratorWrapper.iterate(iterable);
for (int i = 0; i < MAX_PARALLEL_COUNT; ++i)
{
SomeWorkerClass worker = new SomeWorkerClass(qit);
worker.start();
}
}
class ParallelIteratorWrapper<T> implements QuasiIteratorInterface<T>
{
final private Iterator<T> iterator;
final private Object lock = new Object();
private ParallelIteratorWrapper(Iterator<T> iterator) {
this.iterator = iterator;
}
static public <T> ParallelIteratorWrapper<T> iterate(Iterable<T> iterable)
{
return new ParallelIteratorWrapper(iterable.iterator());
}
private T getNextItem()
{
synchronized(lock)
{
if (this.iterator.hasNext())
return this.iterator.next();
else
return null;
}
}
/* QuasiIteratorInterface methods here */
}
Voici mon problème:
-
cela n'a aucun sens d'utiliser
Iterator
directement, car hasNext () et next () ont un problème de synchronisation, où hasNext () est inutile si quelqu'un d'autre appelle next () avant vous. -
J'adorerais utiliser
Queue
, mais la seule méthode dont j'ai besoin estpoll()
-
J'adorerais utiliser ConcurrentLinkedQueue pour contenir mon grand nombre d'éléments ... sauf que je devrai peut-être parcourir les éléments plus d'une fois, donc je ne peux pas l'utiliser.
Des suggestions?
La solution
Créez votre propre interface Producer
avec la méthode poll()
ou l'équivalent (Guava's Supplier
par exemple).Les options d'implémentation sont nombreuses mais si vous avez une liste d'accès aléatoire immuable, vous pouvez simplement maintenir un compteur monotone thread-safe (AtomicInteger par exemple) et appeler list.get (int) par exemple:
class ListSupplier<T> implements Supplier<T> {
private final AtomicInteger next = new AtomicInteger();
private final List<T> elements; // ctor injected
…
public <T> get() {
// real impl more complicated due to bounds checks
// and what to do when exhausted
return elements.get(next.getAndIncrement());
}
}
C'est thread-safe, mais vous voudrez probablement renvoyer un élément de style Option ou null une fois épuisé.
Autres conseils
Avoir un thread de distribution qui itère sur Iterable et distribue des éléments à plusieurs threads de travail qui effectuent le travail sur les éléments.Vous pouvez utiliser ThreadPoolExecutor
pour automatiser cela.