Frage

Ich möchte eine Art von Producer/Consumer Threading App erstellen. Aber ich bin mir nicht sicher, was der beste Weg, um eine Warteschlange zwischen den beiden zu implementieren.

Also ich habe einige sich mit zwei Ideen (die beide völlig falsch sein könnte). Ich möchte wissen, was besser wäre und wenn sie beide dann saugen, was wäre der beste Weg, um die Warteschlange zu implementieren. Es ist in erster Linie meine Implementierung der Warteschlange in diesen Beispielen, dass ich bin besorgt. Ich bin eine Queue Klasse erstreckt, die in eine Haus-Klasse und sind threadsicher. Im Folgenden sind zwei Beispiele mit 4 Klassen je.

Hauptklasse -

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

Verbraucherklasse -

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

Der Produzent Klasse -

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

Klasse Queue -

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

oder

Hauptklasse -

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

Verbraucherklasse -

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

Der Produzent Klasse -

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

Klasse Queue -

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

Und gehen Sie!

War es hilfreich?

Lösung

Java 5+ hat alle Werkzeuge, die Sie für diese Art der Sache müssen. Sie wollen:

  1. Legen Sie alle Ihre Hersteller in einem ExecutorService ;
  2. Legen Sie alle Ihre Verbraucher in einem anderen ExecutorService;
  3. Falls erforderlich, kommunizieren zwischen den beiden mit einem BlockingQueue .

Ich sage „wenn nötig“ für (3), weil aus meiner Erfahrung ist es ein unnötiger Schritt. Alles, was Sie tun, ist es, neue Aufgaben für den Verbraucher Testamentsvollstrecker Dienst vorzulegen. Also:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

So legt die producers direkt an consumers.

Andere Tipps

OK, wie andere beachten Sie, dass das Beste, was zu tun ist, java.util.concurrent Paket zu verwenden. Ich empfehle "Java Concurrency in Practice" hoch. Es ist ein großes Buch, das deckt fast alles, was Sie wissen müssen.

Wie für Ihre spezielle Implementierung, wie ich in den Kommentaren erwähnt, nicht Themen von Konstrukteurs starten -. Es gefährlich sein kann,

verlassen, dass abgesehen, scheint die zweite Implementierung besser. Sie wollen nicht, Warteschlangen einzureihen in statischen Feldern. Sie sind wahrscheinlich nur Flexibilität für nichts zu verlieren.

Wenn Sie im Voraus mit Ihrer eigenen Implementierung gehen wollen (für das Lernen Zweck Ich denke,?), Ein start() Verfahren zumindest liefern. Sie sollten das Objekt erstellen (Sie können die Thread Objekt instanziiert kann) und dann start() rufen Sie den Thread zu starten.

Edit: ExecutorService ihre eigene Warteschlange hat so diese verwirrend .. etwas Hier können Sie den Einstieg.

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

Weitere EDIT: Für Produzenten, statt while(true), können Sie so etwas wie:

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

Auf diese Weise können Sie Herunterfahren der Testamentsvollstrecker von .shutdownNow() aufrufen. Wenn Sie while(true) verwenden würde, wird es nicht ausgeschaltet.

Beachten Sie auch, dass die Producer zu RuntimeExceptions immer noch anfällig ist (das heißt ein RuntimeException wird die Verarbeitung stoppen)

Sie das Rad neu erfinden.

Wenn Sie Persistenz und andere Enterprise-Funktionen verwenden JMS (ich würde vorschlagen < a href = "http://activemq.apache.org/" rel = "nofollow noreferrer"> ActiveMQ ).

Wenn Sie schnell in-Speicher-Warteschlangen verwenden Sie eine der impementations von Java Queue .

Wenn Sie Unterstützung benötigen Java 1.4 oder früher, verwenden Doug Lea ausgezeichnet Concurrent Paket.

Ich habe cletus vorgeschlagene Antwort auf Arbeitscodebeispiel erweitert.

  1. Ein ExecutorService (pes) übernimmt Producer Aufgaben.
  2. Ein ExecutorService (ces) akzeptiert Consumer Aufgaben.
  3. Sowohl Producer und Consumer Aktien BlockingQueue.
  4. Mehrere Producer Aufgaben erzeugt unterschiedliche Nummern.
  5. Jede Consumer Aufgaben Anzahl von Producer erzeugt verbrauchen

Code:

import java.util.concurrent.*;

public class ProducerConsumerWithES {
    public static void main(String args[]){
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

         ExecutorService pes = Executors.newFixedThreadPool(2);
         ExecutorService ces = Executors.newFixedThreadPool(2);

         pes.submit(new Producer(sharedQueue,1));
         pes.submit(new Producer(sharedQueue,2));
         ces.submit(new Consumer(sharedQueue,1));
         ces.submit(new Consumer(sharedQueue,2));
         // shutdown should happen somewhere along with awaitTermination
         / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
         pes.shutdown();
         ces.shutdown();
    }
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

Ausgabe:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1

Hinweis. Wenn Sie nicht mehrere Hersteller und Verbraucher brauchen, halten einzelne Erzeuger und Verbraucher. Ich habe mehrere Produzenten und Konsumenten zu präsentieren Fähigkeiten von Blocking unter mehreren Produzenten aufgenommen und Verbrauchern.

Dies ist ein sehr einfacher Code.

import java.util.*;

// @author : rootTraveller, June 2017

class ProducerConsumer {
    public static void main(String[] args) throws Exception {
        Queue<Integer> queue = new LinkedList<>();
        Integer buffer = new Integer(10);  //Important buffer or queue size, change as per need.

        Producer producerThread = new Producer(queue, buffer, "PRODUCER");
        Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");

        producerThread.start();  
        consumerThread.start();
    }   
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private int queueSize ;

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super(ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.size() == queueSize){
                    System.out.println(Thread.currentThread().getName() + " FULL         : waiting...\n");
                    try{
                        queue.wait();   //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue empty then produce one, add and notify  
                int randomInt = new Random().nextInt(); 
                System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
                queue.add(randomInt); 
                queue.notifyAll();  //Important
            } //synchronized ends here : NOTE
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> queue;
    private int queueSize;

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super (ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.isEmpty()){
                    System.out.println(Thread.currentThread().getName() + " Empty        : waiting...\n");
                    try {
                        queue.wait();  //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue not empty then consume one and notify
                System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
                queue.notifyAll();
            } //synchronized ends here : NOTE
        }
    }
}
  1. Java-Code „Blocking“, die Put-synchronisiert hat und Verfahren erhalten.
  2. Java-Code "Producer", Produzent Thread zu produzieren Daten.
  3. Java-Code "Consumer", Verbraucher-Thread die Daten erzeugt zu konsumieren.
  4. Java-Code "ProducerConsumer_Main", Hauptfunktion des Erzeuger- und Verbraucher Thread zu starten.

BlockingQueue.java

public class BlockingQueue 
{
    int item;
    boolean available = false;

    public synchronized void put(int value) 
    {
        while (available == true)
        {
            try 
            {
                wait();
            } catch (InterruptedException e) { 
            } 
        }

        item = value;
        available = true;
        notifyAll();
    }

    public synchronized int get()
    {
        while(available == false)
        {
            try
            {
                wait();
            }
            catch(InterruptedException e){
            }
        }

        available = false;
        notifyAll();
        return item;
    }
}

Consumer.java

package com.sukanya.producer_Consumer;

public class Consumer extends Thread
{
    blockingQueue queue;
    private int number;
    Consumer(BlockingQueue queue,int number)
    {
        this.queue = queue;
        this.number = number;
    }

    public void run()
    {
        int value = 0;

        for (int i = 0; i < 10; i++) 
        {
            value = queue.get();
            System.out.println("Consumer #" + this.number+ " got: " + value);
        }
    }
}

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer;

public class ProducerConsumer_Main 
{
    public static void main(String args[])
    {
        BlockingQueue queue = new BlockingQueue();
        Producer producer1 = new Producer(queue,1);
        Consumer consumer1 = new Consumer(queue,1);
        producer1.start();
        consumer1.start();
    }
}
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top