Pregunta

I'm learning concurrency in Java and went over the tutorials on Oracle website. While I have understood some of it, a greater portion eludes me. I was thinking of a hypothetical problem (though it may or may not be a good case of using multiple threads) where I have 100 text files and I need to search for a particular word in all of them. If I implement a blocking queue, and I do not want to use a ThreadPool with an executor service:

  1. How should I go about solving this problem (thinking algorithmically)?
  2. How should I do it if I want to implement the BlockingQueue with a multiple producers, multiple consumers model in mind wherein I have 100 threads to put() 100 text file's contents in the BlockingQueue and another 100 to take() and search for a particular word in them?

What I have written may or may not make sense but I'm just a beginner and want to learn more about this as this problem has frequently appeared in programming interviews.

¿Fue útil?

Solución

Great question! I've written a small example (it only uses 6 threads, but can easily be expanded) to illustrate how you could read multiple files (one thread to read each file) and process the data with multiple threads.

So let's begin with the Controller which is basically just the director in charge of creating and managing the other threads. You'll notice that it gives each thread a reference to the queue which allows the threads to do their work - either adding items or removing items from the queue. You'll also notice that it keeps two collections of threads - one for the producer threads and another for all of the threads. The producer thread collection is used to provide a way for the consumer threads to know whether they should continue to wait for more input. The collection holding all the threads is used to keep the controller from exiting before all producers and consumers have completed their work.

package multithreading.producer_consumer.blockingQueue;

import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

public class Controller {

    private static final int NUMBER_OF_CONSUMERS = 3;
    private static final int NUMBER_OF_PRODUCERS = 3;
    private static final int QUEUE_SIZE = 2;
    private static BlockingQueue<String> queue;
    private static Collection<Thread> producerThreadCollection, allThreadCollection;

    public static void main(String[] args) {
        producerThreadCollection = new ArrayList<Thread>();
        allThreadCollection = new ArrayList<Thread>();
        queue = new LinkedBlockingDeque<String>(QUEUE_SIZE);

        createAndStartProducers();
        createAndStartConsumers();

        for(Thread t: allThreadCollection){
            try {
                t.join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        System.out.println("Controller finished");
    }

    private static void createAndStartProducers(){
        for(int i = 1; i <= NUMBER_OF_PRODUCERS; i++){
            Producer producer = new Producer(Paths.get("./src/multithreading/producer_consumer/blockingQueue/file"+i+".txt"), queue);
            Thread producerThread = new Thread(producer,"producer-"+i);
            producerThreadCollection.add(producerThread);
            producerThread.start();
        }
        allThreadCollection.addAll(producerThreadCollection);
    }

    private static void createAndStartConsumers(){
        for(int i = 0; i < NUMBER_OF_CONSUMERS; i++){
            Thread consumerThread = new Thread(new Consumer(queue), "consumer-"+i);
            allThreadCollection.add(consumerThread);
            consumerThread.start();
        }
    }

    public static boolean isProducerAlive(){
        for(Thread t: producerThreadCollection){
            if(t.isAlive())
                return true;
        }
        return false;
    }
}

Next, here's the code for the Producer class that will be used to create all of the threads whose job it is to read a single file each. You'll see that the producer reads a specific file line by line and adds those lines to the queue as there is space available by making use of the put method.

package multithreading.producer_consumer.blockingQueue;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable{

    private Path fileToRead;
    private BlockingQueue<String> queue;

    public Producer(Path filePath, BlockingQueue<String> q){
        fileToRead = filePath;
        queue = q;
    }

    @Override
    public void run() {
        try {
            BufferedReader reader = Files.newBufferedReader(fileToRead);
            String line;
            while((line = reader.readLine()) != null){
                try {
                    queue.put(line);
                    System.out.println(Thread.currentThread().getName() + " added \"" + line + "\" to queue, queue size: " + queue.size());             
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

            System.out.println(Thread.currentThread().getName()+" finished");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

Finally, here is the Consumer class that will be responsible for reading data from the queue and processing it appropriately. Notice that this class does not use the take method. I wrote it this way so that the program would end after processing all the files. If you want the consumers to stay alive you could replace poll with take (along with a few other minor adjustments to the run method like handing the InterruptedException that might occur while waiting for take to return a value).

package multithreading.producer_consumer.blockingQueue;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{
    private BlockingQueue<String> queue;

    public Consumer(BlockingQueue<String> q){
        queue = q;
    }

    public void run(){
        while(true){
            String line = queue.poll();

            if(line == null && !Controller.isProducerAlive())
                return;

            if(line != null){
                System.out.println(Thread.currentThread().getName()+" processing line: "+line);
                //Do something with the line here like see if it contains a string
            }

        }
    }
}

Here are the 3 input files I used:

file1.txt

file #1 line 1
file #1 line 2
file #1 line 3
file #1 line 4
file #1 line 5

file2.txt

This is file #2 line 1
This is file #2 line 2
This is file #2 line 3
This is file #2 line 4
This is file #2 line 5

file3.txt

Lastly we have file #3 line 1
Lastly we have file #3 line 2
Lastly we have file #3 line 3
Lastly we have file #3 line 4
Lastly we have file #3 line 5

Here is some sample output from the program. Note that System.out.println is not synchronized so the output is not in order.

consumer-0 processing line: Lastly we have file #3 line 1
consumer-0 processing line: This is file #2 line 1
producer-2 added "This is file #2 line 1" to queue, queue size: 1
producer-2 added "This is file #2 line 2" to queue, queue size: 1
producer-2 added "This is file #2 line 3" to queue, queue size: 1
producer-2 added "This is file #2 line 4" to queue, queue size: 2
consumer-1 processing line: file #1 line 1
consumer-1 processing line: This is file #2 line 4
consumer-1 processing line: This is file #2 line 5
producer-1 added "file #1 line 1" to queue, queue size: 1
producer-1 added "file #1 line 2" to queue, queue size: 0
producer-3 added "Lastly we have file #3 line 1" to queue, queue size: 0
producer-1 added "file #1 line 3" to queue, queue size: 1
consumer-1 processing line: file #1 line 2
producer-2 added "This is file #2 line 5" to queue, queue size: 0
producer-1 added "file #1 line 4" to queue, queue size: 2
producer-2 finished
consumer-2 processing line: This is file #2 line 3
consumer-2 processing line: Lastly we have file #3 line 2
consumer-2 processing line: file #1 line 4
consumer-2 processing line: file #1 line 5
consumer-0 processing line: This is file #2 line 2
producer-1 added "file #1 line 5" to queue, queue size: 0
producer-1 finished
consumer-1 processing line: file #1 line 3
producer-3 added "Lastly we have file #3 line 2" to queue, queue size: 2
producer-3 added "Lastly we have file #3 line 3" to queue, queue size: 1
producer-3 added "Lastly we have file #3 line 4" to queue, queue size: 1
producer-3 added "Lastly we have file #3 line 5" to queue, queue size: 0
producer-3 finished
consumer-0 processing line: Lastly we have file #3 line 3
consumer-2 processing line: Lastly we have file #3 line 5
consumer-1 processing line: Lastly we have file #3 line 4
Controller finished

I hope this is helpful in illustrating how you could accomplish your task without using ExecutorService. Have fun!

Licenciado bajo: CC-BY-SA con atribución
scroll top