Question

I have implemented multithreading in my service layer and want to ensure I have dealt with all cases where the threads are properly handled. I don't want to end up with some kind of exception (such as RuntimeEx or InterruptedEx) which could leave my app in a bad state.

My code is below. Let me know if you can see any errors. Recommendations are most welcome. I'm using java 6.

public class MyRunnable implements Runnable {

    private List<MyData> list;
    private Person p;

    public MyRunnable(List<MyData> list, Person p) {
        this.list = list;  // this list is passed in and cannot be null
        this.p = p;
    }

    @Override
    public void run() {
        // before calling any of the services that gets data from the
        // database, check if the thread has been interrupted
        if (Thread.interrupted()) return;

        List<TypeA> aList;
        try {
            aList = getTypeAFromDatabase1(p);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        if (Thread.interrupted()) return;

        List<TypeB> bList;
        try {
            bList = getTypeBFromDatabase2(p);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        if (Thread.interrupted()) return;

        List<TypeC> cList;
        try {
            cList = getTypeCFromSomeWebService(p);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        MyData d = new MyData();
        d.setPerson(p);
        d.setTypeA(aList);
        d.setTypeB(bList);
        d.setTypeC(cList);
        list.add(d);
    }
}

Service that uses Runnable:

@JsonOperation
public static List<MyData> getMyData(MyParams params) throws Exception {

    List<Person> persons = params.getPersonList();

    try {
        // use synchronized list since all threads will add to this list
        List<MyData> retList = Collections.synchronizedList(new ArrayList<MyData>());

        List<Thread> threads = new ArrayList<Thread>();

        // For each person, start a new thread. It there are any runtime
        // exceptions thrown by any one thread, it will be caught by the
        // bigger try catch block. In case of runtime exception, we will
        // return back to the client right away but the other threads
        // are still processing
        try {

            for (Person p : persons) {
                // create a thread per person and start it
                Runnable task = new MyRunnable(retList, p);
                Thread worker = new Thread(task);
                threads.add(worker);
                worker.start();

                // remember the thread for later use
                threads.add(worker);
            }

            for (Thread thread : threads) {
                // wait for all threads (by waiting on one thread at a time)
                thread.join(3000);  //3 seconds between checking on this thread
            }

        } catch (RuntimeException e) {
            log.error(e);
            for (Thread thread : threads) {
                // try and send an interrupt to all threads so that they
                // don't fetch any more data from the database
                thread.interrupt();
            }
            throw e;
        }

        return retList;

    } catch (Exception e) {
        log.error(e);
        throw e;
    }
}
Was it helpful?

Solution

In most situations it is easier to use tasks instead of threads. You start with an ExecutorService, which restricts the number of threads and is shared across all service operations:

// inject with IoC framework
ExecutorService executor = Executors.newFixedThreadPool(10);

You use the method invokeAll to execute a task for each person. If the tasks do not finish within the given period, then the remaining tasks will be automatically cancelled. In this case, an exception is thrown when invoking the get method of the corresponding future. That means there is no need for additional exception handling.

public List<MyData> getMyData(MyParams params) throws Exception {
    List<Callable<MyData>> tasks = new ArrayList<>();
    for (Person p : persons) {
        tasks.add(new Callable<MyData>() { // use Lambda in Java 8
            public MyData call() {
                MyData d = new MyData();
                d.setPerson(p);
                d.setTypeA(getTypeAFromDatabase1(p));
                d.setTypeB(getTypeBFromDatabase2(p));
                d.setTypeC(getTypeCFromSomeWebService(p));
                return d;
            }
        });
    }
    List<MyData> result = new ArrayList<>();
    for (Future<MyData> future : executor.invokeAll(tasks, 3000, TimeUnit.MILLISECONDS)) {
        result.add(future.get());
    }
    return result;
}

There is no need to check the interrupted state within the callable. If a blocking operation is called within one of the methods, the method will automatically abort execution with an InterruptedException or some other exception (if it is implemented correctly). It is also possible to set the interrupted state instead of throwing an exception. However, that makes less sense for methods with return values.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top