Pregunta

I have created this thread from inside another class for reading status of executors on finish and cancel rest of tasks on failure. Tasks are Runnable

If there is any failure seen, overall status has to be 1 or fail

final CompletionService completionService = new ExecutorCompletionService(getExecutorService());
final List<Future> futures = new ArrayList<Future>();

    FutureTask<Integer> tasks = new FutureTask<Integer>(new Callable<Integer>() {

        public Integer call() {

            int status = 0;
            boolean fail = false;

            try {
                for (int i = 0; i < 10; i++) {

                    MyRunnable resultObj = null;

                    try {
                        resultObj = (MyRunnable) completionService.take().get();
                    } catch (CancellationException e) {
                        // Skip it ..
                    }

                    if (!fail) {
                        status = resultObj.getStatus();

                        if (status == 1) {
                            fail = true;
                            for (Future future : futures) {
                                if (!future.isCancelled() && !future.isDone())
                                    future.cancel(true); // cancel pending tasks including running tasks 
                            }
                        }
                    }
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }

            return status;
        }

            });

Above Thread is started -

ExecutorService pool = Executors.newSingleThreadExecutor();
pool.submit(tasks);

Down under, Object is borrowed from pool and it is a blocking call and I set pool size to 3 So intially 3 MyRunnable workers are creaetd immediatedly. As each worker finish, they aer reused to serve rest of tasks.

for (int i = 0 ; i < 10; i ++;) {

    MyRunnable myRunnable = null;
    myRunnable = (MyRunnable) this.getGenericObjectPool().borrowObject();

    set myRunnable ..

    futures.add(completionService.submit(myRunnable, myRunnable));

}

while (!tasks.isDone()) {

        try {
            Thread.sleep(Global.WaitTime());            
        } catch (InterruptedException iex) {            
        }

}

finalStatus = tasks.get();
pool.shutdown();

GenericObjectPool is configured for reusing objects. I simulated a test in IDE by forcing first Thread to fail and set its status to 1. But, the problem is as soon as it was released, it was reused by borrowObject() and the monitoring Thread saw the changed object whose status was set back to 0 as part of activation of new object which is done by GenricObjectPool.

So, I am not able to read status from failed thread. MyRunnable is not Callable , so I had to trick Runnable using completionService.submit(obj,obj)

This problem wil not happen if make Pool size as 10 or more because then none of object will be reused and I wil succesfully read status of each, but that is not an option.

¿Fue útil?

Solución

I created a CallableDecorator for Runnable to fix this. Now I have proper return value even using GenericObjectPool. Since now there is no dependency on Pool object for reading status, even reusing of object does not leads to reset of status -

So, 2 changes in code - Change

futures.add(completionService.submit(myRunnable, myRunnable));

to

futures.add(completionService.submit(new CallableDecorator(myRunnable)));

add a new class

public class CallableDecorator implements Callable {

       IRunnable r;

       public CallableDecorator(IRunnable r) {

           this.r = r;
       }

       public Integer call() {

           r.run();
           return r.statusCode();
       }
}

interface IRunnable extends Runnable {
     public Integer statusCode();
}

Similarly ,the resultObj must be changed to be integer for getting its value in the monitor thread.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top