Question

I need to implement a consumer producer example. This is a simple program, I modified a bit but I'm not sure if there are potential problems with it. I would appreciate if someone can help me refine it. My main issue right now is that I don't know how to stop the consumer when the producer is done.

I have tried the following code, but stop() is deprecated, and it also doesn't work:

    if (!producer.isAlive()) {
            consumer.stop();
    }

ProducerConsumer.java:

    import java.util.Vector;

    public class ProducerConsumer {
        public static void main(String[] args) {
            int size = 5;
            Vector<Integer> sQ = new Vector<Integer>(size);
            Thread consumer = new Thread(new Consumer(sQ, size));
            Thread producer = new Thread(new Producer(sQ, size));
            consumer.start();
            producer.start();

            if (!producer.isAlive()) {
                consumer.stop();
            }
        }
    }

    class Consumer implements Runnable {
        Vector<Integer> sQ = new Vector<Integer>();
        int size;

        public Consumer(Vector<Integer> sQ, int size) {
            this.sQ = sQ;
            this.size = size;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    System.out.println("Consuming element: " + consume());;
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }

        private int consume() throws InterruptedException {
            synchronized (sQ) {

                while (sQ.isEmpty()) {
                    System.out.println("The queue is empty and "
                            + Thread.currentThread().getName() + " has to wait."
                            + "size is: " + sQ.size());
                    sQ.wait();
                }

                sQ.notifyAll();
                return sQ.remove(0);
            }

        }

    }

    class Producer implements Runnable {
        Vector<Integer> sQ = new Vector<Integer>();
        int size;

        public Producer(Vector<Integer> sQ, int size) {
            this.sQ = sQ;
            this.size = size;
        }

        @Override
        public void run() {
            for (int i = 0; i < 12; ++i) {
                try {
                    produce(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        private void produce(int i) throws InterruptedException {

            synchronized (sQ) {
                while (sQ.size() == size) {
                    System.out.println("The queue is full and "
                            + Thread.currentThread().getName() + " has to wait."
                            + "size is: " + sQ.size());
                    sQ.wait();
                }

                sQ.add(i);
                sQ.notify();
            }

        }
    }
Was it helpful?

Solution

The recommended approach is generally to set a boolean flag (finished or similar) on threads that will need to be terminated and then loop while(!finished). (Note that the flag generally needs to be volatile so that the thread will see changes.) If the thread is expected to be blocking, then you can interrupt() it to restart its waiting loop.

The overall approach you're taking seems to be out of date, though. The BlockingQueue implementations were designed specifically to ease producer-consumer implementations, and many such problems can be more effectively handled by using an Executor and firing off tasks to it as they come in instead of manually queuing and polling.

OTHER TIPS

Use a CountdownLatch. This allows you to wait for it to be lowered in one thread and actually lower it from another. It's thread safe and designed specifically for this usecase.

If you are going to use a boolean, as suggested in one of the commments, use an AtomicBoolean.

In general, avoid using language primitives such as synchronized or volatile and instead use the more higher level constructs provided by the java.concurrent package. If you are going to go low level, you'll need a firm understanding of the semantics.

If you want to reuse rather than reinvent, you might like to use my concurrent processing iterable: https://github.com/jillesvangurp/iterables-support/blob/master/src/main/java/com/jillesvangurp/iterables/ConcurrentProcessingIterable.java

You simply foreach over the input and it concurrently produces the output with as many threads as you need.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top