Question

I wrote a simple consumer-producer problem with a blocking queue that has multiple producers and multiple consumers take and putting integers on the queue. However, when I tried to testing it, the results aren't as desired such as the size of the queue is not correct. I don't think the consumer and producer size are syncing together. Moreover, I put a 2 second sleep on both producer and consumer but when testing, every two seconds it prints out the results of 2 producers and 2 consumers. Does anyone know what I am doing wrong? Maybe I am starting the threads wrong? I commented out another way I did it but the results were still wrong.

Results:

run:
Producing 425     Thread-0 size left 0
Consuming 890     Thread-3 size left 0
Consuming 425     Thread-2 size left 0
Producing 890     Thread-1 size left 0
Consuming 192     Thread-2 size left 0
Consuming 155     Thread-3 size left 0
Producing 155     Thread-1 size left 0
Producing 192     Thread-0 size left 0
Consuming 141     Thread-2 size left 1
Producing 141     Thread-0 size left 0
Producing 919     Thread-1 size left 0
Consuming 919     Thread-3 size left 0
Producing 361     Thread-1 size left 0
Producing 518     Thread-0 size left 0
Consuming 518     Thread-3 size left 0
Consuming 361     Thread-2 size left 0
Producing 350     Thread-0 size left 1
Consuming 350     Thread-3 size left 0
Consuming 767     Thread-2 size left 0
Producing 767     Thread-1 size left 0

Producer

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Producer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            try {
                items.put(rand);
                System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

Consumer

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Consumer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    public void run() {
        while (true) {
            try {
                System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

Test

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {
    public static void main(String args[]) {
        BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

        Producer producer = new Producer(items);
        Consumer consumer = new Consumer(items);
        Thread t1 = new Thread(producer);
        Thread t2 = new Thread(producer);
        Thread t3 = new Thread(consumer);
        Thread t4 = new Thread(consumer);
        /*
        Thread t1 = new Thread(new Producer());
        Thread t2 = new Thread(new Producer());
        Thread t3 = new Thread(new Consumer());
        Thread t4 = new Thread(new Consumer());
        */
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

UPDATE: I tried to implement the reentrant lock but my program stops at the lock line. Any help? Consumer

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Consumer implements Runnable { 

    //private BlockingQueue<Integer> items = new LinkedBlockingQueue<>(); 
    private MyBlockingQ items;

    public Consumer(MyBlockingQ q) { 
        this.items = q; 
    } 

    public void run() { 
        while (true) { 
            items.remove();
            //Thread.sleep(1000);
        }
    }
} 

Producer

import java.util.Random;

public class Producer implements Runnable {
    private MyBlockingQ items;
    public Producer(MyBlockingQ q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            items.add(rand);
        }
    }
}

MyBlockingQ (shared resouce)

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MyBlockingQ {

    private BlockingQueue<Integer> items = new LinkedBlockingQueue<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public MyBlockingQ() {
    }

    public void add(Integer i) {
        lock.writeLock().lock();
        try {
            items.put(i);
            System.out.println("Producing " + i + "     " + Thread.currentThread().getName() + " size left " + items.size());
        } catch (InterruptedException ex) {
            Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void remove() {
        lock.writeLock().lock();
        try {
            int taken = items.take();
            System.out.println("Consuming " + taken + "     " + Thread.currentThread().getName() + " size left " + items.size());
        } catch (InterruptedException ex) {
            Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
        } finally {
            lock.writeLock().unlock();
        }
    }
}

Test

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

public class ProducerConsumer { 
    public static void main(String args[]) { 
        MyBlockingQ items = new MyBlockingQ(); 

        System.out.println("starting");
        Thread t1 = new Thread(new Producer(items)); 
        Thread t2 = new Thread(new Producer(items)); 
        Thread t3 = new Thread(new Consumer(items)); 
        Thread t4 = new Thread(new Consumer(items)); 
        t1.start(); 
        t2.start(); 
        t3.start(); 
        t4.start(); 
    } 
} 
Was it helpful?

Solution

You are probably confused by this parts of output:

Producing 425     Thread-0 size left 0
Consuming 890     Thread-3 size left 0
Consuming 425     Thread-2 size left 0
Producing 890     Thread-1 size left 0

Question: How come Thread-3 is consuming 890 items, before Thread-1 produces them?

Answer: Thread-3 is not consuming items, before they were produced, by Thread-1.

Why: When Thread-1 is putting the items to the Queue, Thread-3 is probably already waiting for items to take from the Queue. So Thread-1 puts the items:

items.put(rand);

And BEFORE Thread-1 hops into the next line and prints the info about the items it produced Thread-3 executes following line:

System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());

Only then Thread-1 executes its println:

System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());

Because of this you can see these funny results in the console.

You might want to read about synchronizing. There are 2 ways to solve your problem:

  • synchronized methods
  • synchronized statements (approach used by brimborium)

Synchronization locks the access to the object(s) that are inside the synchronized block. That means that every other method, has to wait for its turn, before it can access the object(s).

So if you use synchronization on items in both Producer and Consumer then:

  • Consumer cannot take items when Producer is putting them.
  • Producer cannot put items when Consumer is taking them.

I case when items is empty and Consumer's method locks the items, the program will fall into so called deadlock. Producer has to wait for the Consumer to unlock, but it will never happen, since Consumer is waiting to take items (which have to be placed there by Producer).

Moreover, I put a 2 second sleep on both producer and consumer but when testing, every two seconds it prints out the results of 2 producers and 2 consumers.

This is exacly what you should expect. In the Test class you are making 2 producers and 2 consumers.

Thread t1 = new Thread(producer);
Thread t2 = new Thread(producer);
Thread t3 = new Thread(consumer);
Thread t4 = new Thread(consumer);

t1.start();
t2.start();
t3.start();
t4.start();

OTHER TIPS

These two lines

items.put(rand);
System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());

are not synchronized. The producer might put numbers in the queue, but when the size of the queue is displayed from the thread which put in it, the consumer might have already consumed a number.

You need to synchronize the items access. I just slightly changed your example and the result looks good. Because of synchronization, you will also have to take care of dead locks. In this case it should be fine as long as you don't synchronize over the items.take() in the Consumer though.

Your new Test:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ProducerConsumer {
    public static void main(String args[]) {
        BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

        Thread t1 = new Thread(new Producer(items));
        Thread t2 = new Thread(new Producer(items));
        Thread t3 = new Thread(new Consumer(items));
        Thread t4 = new Thread(new Consumer(items));
        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }
}

The consumer

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Consumer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    public void run() {
        while (true) {
            try {
                System.out.println("Consuming " + items.take() + "     " + Thread.currentThread().getName() + " size left " + items.size());
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}

And the producer

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable {

    BlockingQueue<Integer> items = new LinkedBlockingQueue<>();

    public Producer(BlockingQueue<Integer> q) {
        this.items = q;
    }

    private int generateRandomNumber(int start, int end) {
        Random rand = new Random();
        int number = start + rand.nextInt(end - start + 1);
        return number;
    }

    public void run() {
        for (int i = 0; i < 5; i++) {
            int rand = generateRandomNumber(100, 1000);
            try {
                synchronized (items) {
                    items.put(rand);
                    System.out.println("Producing " + rand + "     " + Thread.currentThread().getName() + " size left " + items.size());
                }
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ProducerConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top