Question

I am trying to figure out how to use the types from the java.util.concurrent package to parallelize processing of all the files in a directory.

I am familiar with the multiprocessing package in Python, which is very simple to use, so ideally I am looking for something similar:

public interface FictionalFunctor<T>{
  void handle(T arg);
}

public class FictionalThreadPool {
  public FictionalThreadPool(int threadCount){
    ...
  }
  public <T> FictionalThreadPoolMapResult<T> map(FictionalFunctor<T> functor, List<T> args){
    // Executes the given functor on each and every arg from args in parallel. Returns, when
    // all the parallel branches return.
    // FictionalThreadPoolMapResult allows to abort the whole mapping process, at the least.
  }
}

dir = getDirectoryToProcess();
pool = new FictionalThreadPool(10);   // 10 threads in the pool
pool.map(new FictionalFunctor<File>(){ 
  @Override
  public void handle(File file){
    // process the file
  }
}, dir.listFiles());

I have a feeling that the types in java.util.concurrent allow me to do something similar, but I have absolutely no idea where to start.

Any ideas?

Thanks.

EDIT 1

Following the advices given in the answers, I have written something like this:

public void processAllFiles() throws IOException {
  ExecutorService exec = Executors.newFixedThreadPool(6);
  BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<Runnable>(5); // Figured we can keep the contents of 6 files simultaneously.
  exec.submit(new MyCoordinator(exec, tasks));
  for (File file : dir.listFiles(getMyFilter()) {
    try {
      tasks.add(new MyTask(file));
    } catch (IOException exc) {
      System.err.println(String.format("Failed to read %s - %s", file.getName(), exc.getMessage()));
    }
  }
}

public class MyTask implements Runnable {
  private final byte[] m_buffer;
  private final String m_name;

  public MyTask(File file) throws IOException {
    m_name = file.getName();
    m_buffer = Files.toByteArray(file);
  }

  @Override
  public void run() {
    // Process the file contents
  }
}

private class MyCoordinator implements Runnable {
  private final ExecutorService m_exec;
  private final BlockingQueue<Runnable> m_tasks;

  public MyCoordinator(ExecutorService exec, BlockingQueue<Runnable> tasks) {
    m_exec = exec;
    m_tasks = tasks;
  }

  @Override
  public void run() {
    while (true) {
      Runnable task = m_tasks.remove();
      m_exec.submit(task);
    }
  }
}

How I thought the code works is:

  1. The files are read one after another.
  2. A file contents are saved in a dedicated MyTask instance.
  3. A blocking queue with the capacity of 5 to hold the tasks. I count on the ability of the server to keep the contents of at most 6 files at one time - 5 in the queue and another fully initialized task waiting to enter the queue.
  4. A special MyCoordinator task fetches the file tasks from the queue and dispatches them to the same pool.

OK, so there is a bug - more than 6 tasks can be created. Some will be submitted, even though all the pool threads are busy. I've planned to solve it later.

The problem is that it does not work at all. The MyCoordinator thread blocks on the first remove - this is fine. But it never unblocks, even though new tasks were placed in the queue. Can anyone tell me what am I doing wrong?

Was it helpful?

Solution

The thread pool you are looking for is the ExecutorService class. You can create a fixed-size thread pool using newFixedThreadPool. This allows you to easily implement a producer-consumer pattern, with the pool encapsulating all the queue and worker functionality for you:

ExecutorService exec = Executors.newFixedThreadPool(10);

You can then submit tasks in the form of objects whose type implements Runnable (or Callable if you want to also get a result):

class ThreadTask implements Runnable {
    public void run() {
       // task code
    }
}

...

exec.submit(new ThreadTask());
// alternatively, using an anonymous type
exec.submit(new Runnable() {
           public void run() {
              // task code
           }
      });

A big word of advice on processing multiple files in parallel: if you have a single mechanical disk holding the files it's wise to use a single thread to read them one-by-one and submit each file to a thread pool task as above, for processing. Do not do the actual reading in parallel as it will degrade performance.

OTHER TIPS

A simpler solution than using ExecuterService is to implement your own producer-consumer scheme. Have a thread that create tasks and submits to a LinkedBlockingQueue or ArrayBlockingQueue and have worker threads that check this queue to retrieve the tasks and do them. You may need a special kind of tasks name ExitTask that forces the workers to exit. So at the end of the jobs if you have n workers you need to add n ExitTasks into the queue.

Basically, what @Tudor said, use an ExecutorService, but I wanted to expand on his code and I always feel strange editing other people's posts. Here's a sksleton of what you would submit to the ExecutorService:

public class MyFileTask implements Runnable {
   final File fileToProcess;

   public MyFileTask(File file) {
      fileToProcess = file;
   }

   public void run() {
      // your code goes here, e.g.
      handle(fileToProcess);
      // if you prefer, implement Callable instead
   }
}

See also my blog post here for some more details if you get stuck

Since processing Files often leads to IOExceptions, I'd prefer a Callable (which can throw a checked Exception) to a Runnable, but YMMV.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top