Gestione delle eccezioni da compiti ExecutorService Java
-
20-09-2019 - |
Domanda
Sto cercando di utilizzare la classe ThreadPoolExecutor
di Java per eseguire un gran numero di compiti di peso pesante con un numero fisso di thread. Ciascuno dei compiti ha molti luoghi in cui può fallire a causa di eccezioni.
Ho sottoclasse ThreadPoolExecutor
e ho sovrascritto il metodo afterExecute
che dovrebbe fornire tutte le eccezioni non gestite fanno riscontrati durante l'esecuzione di un compito. Tuttavia, non riesco a farlo funzionare.
Ad esempio:
public class ThreadPoolErrors extends ThreadPoolExecutor {
public ThreadPoolErrors() {
super( 1, // core threads
1, // max threads
1, // timeout
TimeUnit.MINUTES, // timeout units
new LinkedBlockingQueue<Runnable>() // work queue
);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if(t != null) {
System.out.println("Got an error: " + t);
} else {
System.out.println("Everything's fine--situation normal!");
}
}
public static void main( String [] args) {
ThreadPoolErrors threadPool = new ThreadPoolErrors();
threadPool.submit(
new Runnable() {
public void run() {
throw new RuntimeException("Ouch! Got an error.");
}
}
);
threadPool.shutdown();
}
}
L'output di questo programma è "Va tutto bene - situazione normale!" anche se l'unica Runnable presentato al pool di thread genera un'eccezione. Qualsiasi indizio di ciò che sta succedendo qui?
Grazie!
Soluzione
docs :
Nota: Quando le azioni sono racchiuse in attività (come ad esempio FutureTask) sia esplicitamente o attraverso metodi come sostengono, questi oggetti dell'attività di cattura e mantenere eccezioni computazionali, e in modo da non causare brusca terminazione, e l'interno eccezioni non vengono passati a questo metodo.
Quando si invia un Runnable, potrai ottenere avvolto in un futuro.
L'afterExecute dovrebbe essere qualcosa di simile:
public final class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
System.out.println(t);
}
}
}
Altri suggerimenti
ATTENZIONE :. Va notato che questa soluzione bloccherà il thread chiamante
Se si desidera elaborare le eccezioni sollevate dal compito, allora è generalmente meglio usare Callable
piuttosto che Runnable
.
Callable.call()
è consentito di gettare eccezioni controllate, e questi ottenere propagato indietro al thread chiamante:
Callable task = ...
Future future = executor.submit(task);
try {
future.get();
} catch (ExecutionException ex) {
ex.getCause().printStackTrace();
}
Se Callable.call()
genera un'eccezione, questa sarà avvolto in un ExecutionException
e gettato da Future.get()
.
Questa è probabile che sia di gran lunga preferibile a sottoclassi ThreadPoolExecutor
. Vi dà anche la possibilità di ri-presentare l'attività se l'eccezione è una recuperabile.
La spiegazione di questo comportamento è proprio nel Javadoc per afterExecute :
Nota: Quando le azioni sono racchiuse in attività (come ad esempio FutureTask) sia esplicitamente o attraverso metodi come sostengono, questi oggetti dell'attività di cattura e mantenere eccezioni computazionali, e in modo da non causare brusca terminazione, e l'interno eccezioni non vengono passati a questo metodo.
ho ottenuto intorno ad esso avvolgendo l'eseguibile fornito presentata l'esecutore.
CompletableFuture.runAsync(
() -> {
try {
runnable.run();
} catch (Throwable e) {
Log.info(Concurrency.class, "runAsync", e);
}
},
executorService
);
sto usando classe VerboseRunnable
da jcabi-log , che inghiotte tutte le eccezioni e li registra. Molto comodo, ad esempio:
import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
new VerboseRunnable(
Runnable() {
public void run() {
// the code, which may throw
}
},
true // it means that all exceptions will be swallowed and logged
),
1, 1, TimeUnit.MILLISECONDS
);
Un'altra soluzione potrebbe essere quella di utilizzare il ManagedTask e ManagedTaskListener .
Hai bisogno di una Callable o Runnable , che implementa l'interfaccia ManagedTask .
Il metodo getManagedTaskListener
restituisce l'istanza che si desidera.
public ManagedTaskListener getManagedTaskListener() {
E si implementa in ManagedTaskListener il metodo taskDone
:
@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
if (exception != null) {
LOGGER.log(Level.SEVERE, exception.getMessage());
}
}
Maggiori informazioni su gestite del ciclo di vita delle applicazioni e ascoltatore .
Se si desidera monitorare l'esecuzione del compito, si potrebbe girare 1 o 2 filetti (forse di più a seconda del carico) e li usa per fare i compiti da un involucro ExecutionCompletionService.
Questo funziona
- È derivato da SingleThreadExecutor, ma si può adattare facilmente
- codice Java 8 lamdas, ma facile da risolvere
Si creerà un esecutore con un unico filo, che può ottenere un sacco di compiti; e rimarrà in attesa di quello attuale per l'esecuzione fine di iniziare con il prossimo
In caso di errore o eccezione uncaugth uncaughtExceptionHandler si prenderlo
public final class SingleThreadExecutorWithExceptions { public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { ThreadFactory factory = (Runnable runnable) -> { final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> { uncaughtExceptionHandler.uncaughtException(caugthThread, throwable); }); return newThread; }; return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), factory){ protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); if (throwable == null && runnable instanceof Future) { try { Future future = (Future) runnable; if (future.isDone()) { future.get(); } } catch (CancellationException ce) { throwable = ce; } catch (ExecutionException ee) { throwable = ee.getCause(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); // ignore/reset } } if (throwable != null) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable); } } }); } private static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown(); } } /** * A wrapper class that exposes only the ExecutorService methods * of an ExecutorService implementation. */ private static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future submit(Runnable task) { return e.submit(task); } public Future submit(Callable task) { return e.submit(task); } public Future submit(Runnable task, T result) { return e.submit(task, result); } public List> invokeAll(Collection> tasks) throws InterruptedException { return e.invokeAll(tasks); } public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } } private SingleThreadExecutorWithExceptions() {} }
Se il ExecutorService
proviene da una fonte esterna (.. Io e che non è possibile sottoclasse ThreadPoolExecutor
e sovrascrivere afterExecute()
), è possibile utilizzare un proxy dinamica per ottenere il comportamento desiderato:
public static ExecutorService errorAware(final ExecutorService executor) {
return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[] {ExecutorService.class},
(proxy, method, args) -> {
if (method.getName().equals("submit")) {
final Object arg0 = args[0];
if (arg0 instanceof Runnable) {
args[0] = new Runnable() {
@Override
public void run() {
final Runnable task = (Runnable) arg0;
try {
task.run();
if (task instanceof Future<?>) {
final Future<?> future = (Future<?>) task;
if (future.isDone()) {
try {
future.get();
} catch (final CancellationException ce) {
// Your error-handling code here
ce.printStackTrace();
} catch (final ExecutionException ee) {
// Your error-handling code here
ee.getCause().printStackTrace();
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
} catch (final RuntimeException re) {
// Your error-handling code here
re.printStackTrace();
throw re;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
} else if (arg0 instanceof Callable<?>) {
args[0] = new Callable<Object>() {
@Override
public Object call() throws Exception {
final Callable<?> task = (Callable<?>) arg0;
try {
return task.call();
} catch (final Exception e) {
// Your error-handling code here
e.printStackTrace();
throw e;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
}
}
return method.invoke(executor, args);
});
}
Questo è causa di AbstractExecutorService :: submit
è avvolgendo il runnable
in RunnableFuture
(altro che FutureTask
) come qui di seguito
AbstractExecutorService.java
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
execute(ftask);
return ftask;
}
Poi execute
passerà a Worker
e Worker.run()
chiamerà il seguito.
ThreadPoolExecutor.java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); /////////HERE////////
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
Infine
task.run();
nella chiamata codice sopra chiameràFutureTask.run()
. Ecco il codice gestore di eccezioni, a causa della questo non si ottiene l'eccezione prevista.
class FutureTask<V> implements RunnableFuture<V>
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) { /////////HERE////////
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
Invece di sottoclassi ThreadPoolExecutor, vorrei dotarla di un ThreadFactory esempio che crea nuove discussioni e fornisce loro una UncaughtExceptionHandler