Domanda

I am trying to find a good way to achieve the following API:

void add(Object o);
void processAndClear();

The class would store the objects and upon calling processAndClear would iterate through the currently stored ones, process them somehow, and then clear the store. This class should be thread safe.

the obvious approach is to use locking, but I wanted to be more "concurrent". This is the approach which I would use:

class Store{
    private AtomicReference<CopyOnWriteArrayList<Object>> store = new AtomicReference<>(new CopyOnWriteArrayList <>());

    void add(Object o){
        store.get().add(o);
    }

    void processAndClear(){
        CopyOnWriteArrayList<Object> objects = store.get();
        store.compareAndSet(objects, new CopyOnWriteArrayList<>());
        for (Object object : objects) {
            //do sth
        }
    }
}

This would allow threads that try to add objects to proceed almost immediately without any locking/waiting for the xlearing to complete. Is this the more or less correct approach?

È stato utile?

Soluzione

Your above code is not thread-safe. Imagine the following:

  1. Thread A is put on hold at add() right after store.get()
  2. Thread B is in processAndClear(), replaces the list, processes all elements of the old one, then returns.
  3. Thread A resumes and adds a new item to the now obsolete list that will never be processed.

The probably easiest solution here would be to use a LinkedBlockingQueue, which would as well simplify the task a lot:

class Store{
  final LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>();

  void add(final Object o){
    queue.put(o); // blocks until there is free space in the optionally bounded queue
  }

  void processAndClear(){
    Object element;
    while ((element = queue.poll()) != null) { // does not block on empty list but returns null instead
      doSomething(element);
    }
  }
}

Edit: How to do this with synchronized:

class Store{
  final LinkedList<Object> queue = new LinkedList<>(); // has to be final for synchronized to work

  void add(final Object o){
    synchronized(queue) { // on the queue as this is the shared object in question
      queue.add(o);
    }
  }

  void processAndClear() {
    final LinkedList<Object> elements = new LinkedList<>(); // temporary local list
    synchronized(queue) { // here as well, as every access needs to be properly synchronized
      elements.addAll(queue);
      queue.clear();
    }

    for (Object e : elements) { 
      doSomething(e); // this is thread-safe as only this thread can access these now local elements
    }
  }
}

Why this is not a good idea

Although this is thread-safe, it is much slower if compared to the concurrent version. Assume that you have a system with 100 threads that frequently call add, while one thread calls processAndClear. Then the following performance bottle-necks will occur:

  • If one thread calls add the other 99 are put on hold in the meantime.
  • During the first part of processAndClear all 100 threads are put on hold.

If you assume that those 100 adding threads have nothing else to do, you can easily show, that the application runs at the same speed as a single-threaded application minus the cost for synchronization. That means: adding will effectively be slower with 100 threads than with 1. This is not the case if you use a concurrent list as in the first example.

There will however be a minor performance gain with the processing thread, as doSomething can be run on the old elements while new ones are added. But again the concurrent example could be faster, as you could have multiple threads do the processing simultaneously.

Effectively synchronized can be used as well, but you will automatically introduce performance bottle-necks, potentially causing the application to run slower as single-threaded, forcing you to do complicated performance tests. In addition extending the functionality always contains a risk of introducing threading issues, as locking needs to be done manually.
A concurrent list in contrast solves all these problems without additional code and the code can easily changed or extended later on.

Altri suggerimenti

The class would store the objects and upon calling processAndClear would iterate through the currently stored ones, process them somehow, and then clear the store.

This seems like you should use a BlockingQueue for this task. Your add(...) method would add to the queue and your consumer would call take() which blocks waiting for the next item. The BlockingQueue (ArrayBlockingQueue is a typical implementation) takes care of all of the synchronization and signaling for you.

This means that you don't have to have a CopyOnWriteArrayList nor an AtomicReference. What you would lose is a collection and you can iterate through for other reasons than your post articulates currently.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top