Question

I'm implementing a concurrent skip list map based on Java's ConcurrentSkipListMap, the differences being that I want the list to allow duplicates, and I also want the list to be indexable (so that finding the Nth element of the list takes O(lg(n)) time, instead of O(n) time as with a standard skip list). These modifications aren't presenting a problem.

In addition, the skip list's keys are mutable. For example, if the list elements are the integers {0, 4, 7}, then the middle element's key can be changed to any value in [0, 7] without prompting a change to the list structure; if the key changes to (-inf, -1] or [8, +inf) then the element is removed and re-added to maintain the list order. Rather than implementing this as a removal followed by a O(lg(n)) insert, I implement this as a removal followed by a linear traversal followed by an O(1) insert (with an expected runtime of O(1) - 99% of the time the node will be swapped with an adjacent node).

Inserting a completely new node is rare (after startup), and deleting a node (without immediately re-adding it) never occurs; almost all of the operations are elementAt(i) to retrieve the element at the ith index, or operations to swap nodes after a key is modified.

The problem I'm running into is in how to implement the key modification class(es). Conceptually, I'd like to do something like

public class Node implements Runnable {
    private int key;
    private Node prev, next;
    private BlockingQueue<Integer> queue;

    public void update(int i) {
        queue.offer(i);
    }

    public void run() {
        while(true) {
            int temp = queue.take();
            temp += key;
            if(prev.getKey() > temp) {
                // remove node, update key to temp, perform backward linear traversal, and insert
            } else if(next.getKey() < temp) {
                // remove node, update key to temp, perform forward linear traveral, and insert
            } else {
                key = temp; // node doesn't change position
            }
        }
    }
}

(The insert sub-method being called from run uses CAS in order to handle the problem of two nodes attempting to simultaneously insert at the same location (similar to how the ConcurrentSkipListMap handles conflicting inserts) - conceptually this is the same as if the first node locked the nodes adjacent to the insertion point, except that the overhead is reduced for the case where there's no conflict.)

This way I can ensure that the list is always in order (it's okay if a key update is a bit delayed, because I can be certain that the update will eventually happen; however, if the list becomes unordered then things might go haywire). The problem being that implementing the list this way will generate an awful lot of threads, one per Node (with several thousand nodes in the list) - most of them will be blocking at any given point in time, but I'm concerned that several thousand blocking threads will still result in too high of an overhead.

Another option is to make the update method synchronized and remove the Runnable interface from Node, so that rather than having two threads enqueuing updates in the Node which then takes care of processing these updates on its separate thread, the two threads would instead take turns executing the Node#update method. The problem is that this could potentially create a bottleneck; if eight different threads all decided to update the same node at once then the queue implementation would scale just fine, but the synchronized implementation would block seven out of the eight threads (and would then block six threads, then five, etc).

So my question is, how would I implement something like the queue implementation except with a reduced number of threads, or else how would I implement something like the synchronized implementation except without the potential bottleneck problem.

Était-ce utile?

La solution

I think I may be able to solve this with a ThreadPoolExecutor, something like

public class Node {
    private int key;
    private Node prev, next;
    private ConcurrentLinkedQueue<Integer> queue;
    private AtomicBoolean lock = new AtomicBoolean(false);
    private ThreadPoolExecutor executor;
    private UpdateNode updater = new UpdateNode();

    public void update(int i) {
        queue.offer(i);
        if(lock.compareAndSet(false, true)) {
            executor.execute(updater);
        }
    }

    private class UpdateNode implements Runnable {
        public void run() {
            do {
                try {
                    int temp = key;
                    while(!queue.isEmpty()) {
                        temp += queue.poll();
                    }
                    if(prev.getKey() > temp) {
                        // remove node, update key to temp, perform backward linear traversal, and insert
                    } else if(next.getKey() < temp) {
                        // remove node, update key to temp, perform forward linear traveral, and insert
                    } else {
                        key = temp; // node doesn't change position
                    }
                } finally {
                    lock.set(false);
                }
            } while (!queue.isEmpty() && lock.compareAndSet(false, true));
        }
    }
}

This way I have the advantages of the queue approach without having a thousand threads sitting blocked - I instead execute a UpdateNode each time I need to update a node (unless there's already an UpdateNode being executed on that Node, hence the AtomicBoolean that's acting as a lock), and rely on the ThreadPoolExecutor to make it inexpensive to create several thousand runnables.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top