Question

I have been trying to parallelize a portion of a method within my code (as shown in the Example class's function_to_parallelize(...) method). I have examined the executor framework and found that Futures & Callables can be used to create several worker threads that will ultimately return values. However, the online examples often shown with the executor framework are very simple and none of them appear to suffer my particular case of requiring methods in the class that contains that bit of code I'm trying to parallelize. As per one Stackoverflow thread, I've managed to write an external class that implements Callable called Solver that implements that method call() and set up the executor framework as shown in the method function_to_parallelize(...). Some of the computation that would occur in each worker thread requires methods *subroutine_A(...)* that operate on the data members of the Example class (and further, some of these subroutines make use of random numbers for various sampling functions).

My issue is while my program executes and produces results (sometimes accurate, sometimes not), every time I run it the results of the combined computation of the various worker threads is different. I figured it must be a shared memory problem, so I input into the Solver constructor copies of every data member of the Example class, including the utility that contained the Random rng. Further, I copied the subroutines that I require even directly into the Solver class (even though it's able to call those methods from Example without this). Why would I be getting different values each time? Is there something I need to implement, such as locking mechanisms or synchronization?

Alternatively, is there a simpler way to inject some parallelization into that method? Rewriting the "Example" class or drastically changing my class structuring is not an option as I need it in its current form for a variety of other aspects of my software/system.

Below is my code vignette (well, it's an incredibly abstracted/reduced form so as to show you basic structure and the target area, even if it's a bit longer than usual vignettes):

public class Tools{
    Random rng;
    public Tools(Random rng){
        this.rng = rng;
    }...
}
public class Solver implements Callable<Tuple>{
    public Tools toolkit;
    public Item W;
    public Item v;
    Item input;
    double param;

    public Solver(Item input, double param, Item W, Item v, Tools toolkit){
        this.input = input;
        this.param = param;
        //...so on & so forth for rest of arguments
    }       
    public Item call() throws Exception {
        //does computation that utilizes the data members W, v
        //and calls some methods housed in the "toolkit" object
    }       
    public Item subroutine_A(Item in){....}
    public Item subroutine_B(Item in){....}
}
public class Example{
    private static final int NTHREDS = 4;
    public Tools toolkit;
    public Item W;
    public Item v;

    public Example(...,Tools toolkit...){
        this.toolkit = toolkit; ...
    }
    public Item subroutine_A(Item in){
        // some of its internal computation involves sampling & random # generation using
        // a call to toolkit, which houses functions that use the initialize Random rng
        ...
    }
    public Item subroutine_B(Item in){....}
    public void function_to_parallelize(Item input, double param,...){
        ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
        List<Future<Tuple>> list = new ArrayList<Future<Tuple>>();      
        while(some_stopping_condition){
            // extract subset of input and feed into Solver constructor below
            Callable<Tuple> worker = new Solver(input, param, W, v, toolkit);
            Future<Tuple> submit = executor.submit(worker);
            list.add(submit);
        }
         for(Future<Tuple> future : list){
            try {
                Item out = future.get();
                // update W via some operation using "out" (like multiplying matrices for example)
            }catch(InterruptedException e) {
                e.printStackTrace();
            }catch(ExecutionException e) {
                e.printStackTrace();
            }
        }
        executor.shutdown(); // properly terminate the threadpool
    }
}

ADDENDUM: While flob's answer below did address a problem with my vignette/code (you should make sure that you are setting your code up to wait for all threads to catch up with .await()), the issue did not go away after I made this correction. It turns out that the problem lies in how Random works with threads. In essence, the threads are scheduled in various orders (via the OS/scheduler) and hence will not repeat the order in which they are executed every run of the program to ensure that a purely deterministic result is obtained. I examined the thread-safe version of Random (and used it to gain a bit more efficiency) but alas it does not allow you to set the seed. However, I highly recommend those who are looking to incorporate random computations within their thread workers to use this as the RNG for multi-threaded work.

Was it helpful?

Solution

The problem I see is you don't wait for all the tasks to finish before updating W and because of that some of the Callable instances will get the updated W instead of the one you were expecting

At this point W is updated even if not all tasks have finished

Blockquote // update W via some operation using "out" (like multiplying matrices for example)

The tasks that are not finished will take the W updated above instead the one you expect

A quick solution (if you know how many Solver tasks you'll have) would be to use a CountDownLatch in order to see when all the tasks have finished:

 public void function_to_parallelize(Item input, double param,...){
    ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
    List<Future<Tuple>> list = new ArrayList<Future<Tuple>>();

    CountDownLatch latch = new CountDownLatch(<number_of_tasks_created_in_next_loop>);

    while(some_stopping_condition){
        // extract subset of input and feed into Solver constructor below

        Callable<Tuple> worker = new Solver(input, param, W, v, toolkit,latch);

        Future<Tuple> submit = executor.submit(worker);
        list.add(submit);
    }
     latch.await();
     for(Future<Tuple> future : list){
        try {
            Item out = future.get();
            // update W via some operation using "out" (like multiplying matrices for example)
        }catch(InterruptedException e) {
            e.printStackTrace();
        }catch(ExecutionException e) {
            e.printStackTrace();
        }
    }
    executor.shutdown(); // properly terminate the threadpool
}

then in the Solver class you have to decrement the latch when call method ends:

public Item call() throws Exception {
    //does computation that utilizes the data members W, v
    //and calls some methods housed in the "toolkit" object

   latch.countDown();
}   
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top