Question

I have a method that takes an array of queries, and I need to run them against different search engine Web API's, such as Google's or Yahoo's. In order to parallelize the process, a thread is spawned for each query, which are then joined at the end, since my application can only continue after I have the results of every query. I currently have something along these lines:

public abstract class class Query extends Thread {
    private String query;

    public abstract Result[] querySearchEngine();
    @Override
    public void run() {
        Result[] results = querySearchEngine(query);
        Querier.addResults(results);
    }

}

public class GoogleQuery extends Query {
    public Result querySearchEngine(String query) { 
        // access google rest API
    }
}

public class Querier {
    /* Every class that implements Query fills this array */
    private static ArrayList<Result> aggregatedResults;

    public static void addResults(Result[]) { // add to aggregatedResults }

    public static Result[] queryAll(Query[] queries) {
        /* for each thread, start it, to aggregate results */
        for (Query query : queries) {
            query.start();
        }
        for (Query query : queries) {
            query.join();
        }
        return aggregatedResults;
    }
}

Recently, I have found that there's a new API in Java for doing concurrent jobs. Namely, the Callable interface, FutureTask and ExecutorService. I was wondering if this new API is the one that should be used, and if they are more efficient than the traditional ones, Runnable and Thread.

After studying this new API, I came up with the following code (simplified version):

   public abstract class Query implements Callable<Result[]> {
        private final String query; // gets set in the constructor

        public abstract Result[] querySearchEngine();
        @Override
        public Result[] call() {
            return querySearchEngine(query);
        }
    }

public class Querier {   
        private ArrayList<Result> aggregatedResults;

        public Result[] queryAll(Query[] queries) {
            List<Future<Result[]>> futures = new ArrayList<Future<Result[]>>(queries.length);
            final ExecutorService service = Executors.newFixedThreadPool(queries.length);  
            for (Query query : queries) {
                futures.add(service.submit(query));  
            }
            for (Future<Result[]> future : futures) {  
                aggregatedResults.add(future.get());  // get() is somewhat similar to join?
            }  
            return aggregatedResults;
        }
    }

I'm new to this concurrency API, and I'd like to know if there's something that can be improved in the above code, and if it's better than the first option (using Thread). There are some classes which I didn't explore, such as FutureTask, et cetera. I'd love to hear any advice on that as well.

Was it helpful?

Solution

Several problems with your code.

  1. You should probably be using the ExecutorService.invokeAll() method. The cost of creating new threads and a new thread pool can be significant (though maybe not compared to calling external search engines). invokeAll() can manage the threads for you.
  2. You probably don't want to mix arrays and generics.
  3. You are calling aggregatedResults.add() instead of addAll().
  4. You don't need to use member variables when they could be local to the queryAll() function call.

So, something like the following should work:

public abstract class Query implements Callable<List<Result>> {
    private final String query; // gets set in the constructor

    public abstract List<Result> querySearchEngine();
    @Override
    public List<Result> call() {
        return querySearchEngine(query);
    }
}

public class Querier {   
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public List<Result> queryAll(List<Query> queries) {
        List<Future<List<Result>>> futures = executor.submitAll(queries);
        List<Result> aggregatedResults = new ArrayList<Result>();
        for (Future<List<Result>> future : futures) {  
            aggregatedResults.addAll(future.get());  // get() is somewhat similar to join?
        }  
        return aggregatedResults;
    }
}

OTHER TIPS

As a futher improvement, you could look into using a CompletionService It decouples the order of submitting and retrieving, instead placing all the future results on a queue from which you take results in the order they are completed..

Can I suggest you use Future.get() with a timeout ?

Otherwise it'll only take one search engine being unresponsive to bring everything to a halt (it doesn't even need to be a search engine problem if, say, you have a network issue at your end)

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top