Domanda

Sto cercando di utilizzare la classe ThreadPoolExecutor di Java per eseguire un gran numero di compiti di peso pesante con un numero fisso di thread. Ciascuno dei compiti ha molti luoghi in cui può fallire a causa di eccezioni.

Ho sottoclasse ThreadPoolExecutor e ho sovrascritto il metodo afterExecute che dovrebbe fornire tutte le eccezioni non gestite fanno riscontrati durante l'esecuzione di un compito. Tuttavia, non riesco a farlo funzionare.

Ad esempio:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

L'output di questo programma è "Va tutto bene - situazione normale!" anche se l'unica Runnable presentato al pool di thread genera un'eccezione. Qualsiasi indizio di ciò che sta succedendo qui?

Grazie!

È stato utile?

Soluzione

docs :

  

Nota: Quando le azioni sono racchiuse in   attività (come ad esempio FutureTask) sia   esplicitamente o attraverso metodi come   sostengono, questi oggetti dell'attività di cattura e   mantenere eccezioni computazionali, e   in modo da non causare brusca   terminazione, e l'interno   eccezioni non vengono passati a questo   metodo.

Quando si invia un Runnable, potrai ottenere avvolto in un futuro.

L'afterExecute dovrebbe essere qualcosa di simile:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}

Altri suggerimenti

ATTENZIONE :. Va notato che questa soluzione bloccherà il thread chiamante


Se si desidera elaborare le eccezioni sollevate dal compito, allora è generalmente meglio usare Callable piuttosto che Runnable.

Callable.call() è consentito di gettare eccezioni controllate, e questi ottenere propagato indietro al thread chiamante:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

Se Callable.call() genera un'eccezione, questa sarà avvolto in un ExecutionException e gettato da Future.get().

Questa è probabile che sia di gran lunga preferibile a sottoclassi ThreadPoolExecutor. Vi dà anche la possibilità di ri-presentare l'attività se l'eccezione è una recuperabile.

La spiegazione di questo comportamento è proprio nel Javadoc per afterExecute :

  

Nota: Quando le azioni sono racchiuse in   attività (come ad esempio FutureTask) sia   esplicitamente o attraverso metodi come   sostengono, questi oggetti dell'attività di cattura e   mantenere eccezioni computazionali, e   in modo da non causare brusca   terminazione, e l'interno   eccezioni non vengono passati a questo   metodo.

ho ottenuto intorno ad esso avvolgendo l'eseguibile fornito presentata l'esecutore.

CompletableFuture.runAsync(

        () -> {
                try {
                        runnable.run();
                } catch (Throwable e) {
                        Log.info(Concurrency.class, "runAsync", e);
                }
        },

        executorService
);

sto usando classe VerboseRunnable da jcabi-log , che inghiotte tutte le eccezioni e li registra. Molto comodo, ad esempio:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

Un'altra soluzione potrebbe essere quella di utilizzare il ManagedTask e ManagedTaskListener .

Hai bisogno di una Callable o Runnable , che implementa l'interfaccia ManagedTask .

Il metodo getManagedTaskListener restituisce l'istanza che si desidera.

public ManagedTaskListener getManagedTaskListener() {

E si implementa in ManagedTaskListener il metodo taskDone:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

Maggiori informazioni su gestite del ciclo di vita delle applicazioni e ascoltatore .

Se si desidera monitorare l'esecuzione del compito, si potrebbe girare 1 o 2 filetti (forse di più a seconda del carico) e li usa per fare i compiti da un involucro ExecutionCompletionService.

Questo funziona

  • È derivato da SingleThreadExecutor, ma si può adattare facilmente
  • codice Java 8 lamdas, ma facile da risolvere

Si creerà un esecutore con un unico filo, che può ottenere un sacco di compiti; e rimarrà in attesa di quello attuale per l'esecuzione fine di iniziare con il prossimo

In caso di errore o eccezione uncaugth uncaughtExceptionHandler si prenderlo

public final class SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable)  -> {
            final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException(caugthThread, throwable);
            });
            return newThread;
        };
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue(),
                        factory){


                    protected void afterExecute(Runnable runnable, Throwable throwable) {
                        super.afterExecute(runnable, throwable);
                        if (throwable == null && runnable instanceof Future) {
                            try {
                                Future future = (Future) runnable;
                                if (future.isDone()) {
                                    future.get();
                                }
                            } catch (CancellationException ce) {
                                throwable = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause();
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt(); // ignore/reset
                            }
                        }
                        if (throwable != null) {
                            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable);
                        }
                    }
                });
    }



    private static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

    /**
     * A wrapper class that exposes only the ExecutorService methods
     * of an ExecutorService implementation.
     */
    private static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future submit(Runnable task) {
            return e.submit(task);
        }
        public  Future submit(Callable task) {
            return e.submit(task);
        }
        public  Future submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public  List> invokeAll(Collection> tasks)
                throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public  List> invokeAll(Collection> tasks,
                                             long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public  T invokeAny(Collection> tasks)
                throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public  T invokeAny(Collection> tasks,
                               long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }



    private SingleThreadExecutorWithExceptions() {}
}

Se il ExecutorService proviene da una fonte esterna (.. Io e che non è possibile sottoclasse ThreadPoolExecutor e sovrascrivere afterExecute()), è possibile utilizzare un proxy dinamica per ottenere il comportamento desiderato:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}

Questo è causa di AbstractExecutorService :: submit è avvolgendo il runnable in RunnableFuture (altro che FutureTask) come qui di seguito

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Poi execute passerà a Worker e Worker.run() chiamerà il seguito.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
  

Infine task.run(); nella chiamata codice sopra chiamerà   FutureTask.run(). Ecco il codice gestore di eccezioni, a causa della   questo non si ottiene l'eccezione prevista.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Invece di sottoclassi ThreadPoolExecutor, vorrei dotarla di un ThreadFactory esempio che crea nuove discussioni e fornisce loro una UncaughtExceptionHandler

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top