Question

I am learning concurrent programming and wrote this concurrentLinkeQueue using AtomicReference.

Following Example goes into Deadlock. Please see.

 package concurrent.AtomicE;

 import java.util.concurrent.atomic.AtomicReference;

 public class ConcurrentLinkQueue<V> {
private AtomicReference<Node> head = new AtomicReference<Node>();

public void offer(final V data) {
    final Node<V> newNode = new Node<V>(data,Thread.currentThread().getName());
    System.out.println("*********** NEW "+ newNode);
    AtomicReference<Node> pointer = head;
    for(;;){
         if(pointer.get() == null){ // Threads wait here for infinite time
             final boolean success = pointer.compareAndSet(null,newNode);
             System.out.println(Thread.currentThread().getName() +" " + success);
             if(success)
             {
                 System.out.println(Thread.currentThread().getName() +"Returning");
                 return;
             }else{
                 final Node<V> current = pointer.get();
                 pointer = current.next;
                 System.out.println(Thread.currentThread().getName() +" Next Pointer");
             }
        }
    }
}

public void printQueueData(){
    AtomicReference<Node> pointer = head;
    for(;pointer!=null;){
        final Node node = pointer.get();
        System.out.println(node);
        pointer = node.next;
    }
}

private static class Node<V>{
    private AtomicReference<Node> next;
    private volatile V data = null;
    private String threadName = "";

    Node(V data1,String threadName){
        this.data = data1;
        this.threadName = threadName;
    }

    @Override
    public String toString() {
        return  "threadName=" + threadName +
                ", data=" + data;
    }

    private AtomicReference<Node> getNext() {
        return next;
    }

    private void setNext(AtomicReference<Node> next) {
        this.next = next;
    }

    private V getData() {
        return data;
    }

    private void setData(V data) {
        this.data = data;
    }
}

}

      package concurrent.AtomicE;

 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

 public class Main {
private static final ConcurrentLinkQueue<Integer> clq =  new ConcurrentLinkQueue<Integer>();

public static void main(String[] args) throws InterruptedException {
    Task t = new Task();
    Thread t1 = new Thread(t); t1.setName("t1");
    Thread t2 = new Thread(t); t2.setName("t2");
    //Thread t3 = new Thread(t); t3.setName("t3");
    //Thread t4 = new Thread(t); t4.setName("t4");
    //Thread t5 = new Thread(t); t5.setName("t5");

    t1.start();
    t2.start();
    //t3.start();
    //t4.start();
    //t5.start();

    t1.join();
    t2.join();
    //t3.join();
    //t4.join();
    //t5.join();


}

private static class Task implements Runnable{

    @Override
    public void run() {
        for(int i=0;i<5;++i){
            clq.offer(i);
        }
    }
}

}

after taking thread dump it shows that threads wait forever at following line

 if(pointer.get() == null){ // Threads wait here for infinite time

can you please help why threads wait here forever?

[EDIT] Solved it --->

 public class ConcurrentLinkQueue<V> {
  private final AtomicReference<Node> firstNodePointer = new AtomicReference<Node>();

public void offer(final V data) {
    final Node<V> newNode = new Node<V>(data,Thread.currentThread().getName());
    System.out.println(newNode);
    final Node<Integer> firstNode = firstNodePointer.get();
    if(firstNode == null){
        if(firstNodePointer.compareAndSet(null,newNode) == true)
            return;
    }
    boolean success = false;
    Node<Integer> nodePointer = firstNode;
    AtomicReference<Node> atomicRefPointer = firstNodePointer;
    while(!success){
        atomicRefPointer = nodePointer.getNext();
        if(atomicRefPointer.get() == null){
            success = atomicRefPointer.compareAndSet(null,newNode);
        }else{
            nodePointer = atomicRefPointer.get();
        }
    }
}

}

Another Solution->

      public void fastOffer(final V data){
    final Node<V> newNode = new Node<V>(data,Thread.currentThread().getName());
    System.out.println(newNode);
    AtomicReference<Node> pointer = firstNodePointer;
    for(;;){
            if(pointer.compareAndSet(null,newNode)){
                return;
            }

        pointer = pointer.get().getNext();
    }
}
Was it helpful?

Solution

In your example condition pointer.get() == null always returns false excepts first case when you assign it to head, because in Node class it null. You can assign it with default value and remove null check.

I suggest you to change a bit Node class, make it immutable:

 private static class Node<V> {
        private final AtomicReference<Node> next = new AtomicReference<>();
        private final V data;
        private final String threadName;

        Node(V data1, String threadName) {
            this.data = data1;
            this.threadName = threadName;
        }
    }

And then you can simple go through all elements:

private final AtomicReference<Node> head = new AtomicReference<>();

@SuppressWarnings("unchecked")
public void offer(final V data) {
    // create new Node
    final Node<V> newNode = new Node<>(data, Thread.currentThread().getName());
    // set root element if it's null
    if (head.compareAndSet(null, newNode)) {
        return;
    }
    // else pass trough all elements and try to set new
    Node<V> pointer = head.get();
    for (;;) {
        if (pointer.next.compareAndSet(null, newNode)) {
            break;
        }
        pointer = pointer.next.get();
    }
}

And change print method:

    @SuppressWarnings("unchecked")
    public void printQueueData() {
        AtomicReference<Node> pointer = head;
        while (pointer.get() != null) {
            System.out.println(pointer.get().data);
            pointer = pointer.get().next;
        }
    }
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top