Вопрос

I am writing a service that calls a few external services. I am using futures to represent the result of all those external service calls. I collapse all the futures to a single future using Futures.successfulAsList() method provided by Guava library.

Here is my code

List<ListenableFuture<List<T>>> futureList = new ArrayList<>();

for(int id: shardIds) {
    ListeningExecutorService service =
          (ListeningExecutorService) _shardMgr.getExecutorService(id);
    SelectTask task = new SelectTask(_shardMgr.getReadHandle(id), sql, mapper);


    ListenableFuture<List<T>> future = service.submit(task);
    //Add Callback
    Futures.addCallback(future, new ErrorCallBack(task),
            Executors.newFixedThreadPool(1));
    futureList.add(future);
}

ListenableFuture<List<List<T>>> combinedFuture =
        Futures.successfulAsList(futureList);
int timeout = _dbTimeout.get();
List<T> selectResult = new ArrayList<T>();

try {
    List<List<T>> result = combinedFuture.get(timeout, TimeUnit.MILLISECONDS);
    for(List<T> sublist: result) {
        for(T t : sublist) {
            //TODO: Do we want to put a cap on how many results we return here?
            //I think we should
            selectResult.add(t);
        }
    }
}
catch(Exception ex) {
    log.error("******************* Exception in parallelSelect ",ex);
    throw new RuntimeException("Error in parallelSelect");
}

When one of my future ( external service call ) fails ErrorCallBack's onFailure() is called, But i still get past of combinedFuture.get(timeout, TimeUnit.MILLISECONDS); and i get NullPointerException in line for(T t : sublist) ... while iterating over the results.

I expect that when one external service call fails, i should not get past combinedFuture.get()

Am i doing something wrong ? I even tried to throw Exception from ErrorCallBack's onFailure method.

Here is ErrorCallBack's implementation

private class ErrorCallBack<T> implements FutureCallback<List<T>>  {
    private final SelectTask _task;

    public ErrorCallBack(SelectTask task) {
        _task = task;
    }

    @Override
    public void onFailure(Throwable t) {
        log.error("ErrorCallBack:onFailure(). Enter");
        DBErrorType type = DBErrorType.UNKNOWN;
        try {
            log.error("ErrorCallBack:onFailure(). Exception ",t);
            if(t instanceof InterruptedException || t instanceof CancellationException) {
                type = DBErrorType.UNKNOWN;
            } else if ( t instanceof SQLException || t.getCause() instanceof SQLException) {
                type = DBErrorType.SQL_SYNTAX_ERROR;
            } else if ( t instanceof MySQLSyntaxErrorException || t.getCause() instanceof MySQLSyntaxErrorException) {
                type = DBErrorType.SQL_SYNTAX_ERROR;
            } else if ( t instanceof ExecutionException) {
                type = DBErrorType.SQL_SYNTAX_ERROR;
            } else if (t instanceof TimeoutException) {
                type = DBErrorType.NETWORK_ERROR;
            } else {
                type = DBErrorType.UNKNOWN;
            }
            ShardHandle handle = _task.getShardHandle();
            _shardMgr.reportException(handle, type);
            DBException exception = new DBException(handle.getShardInfo(), type, ErrorSeverity.CRITICAL, t);
            _alertModule.handleAlert(exception.getAlertContext());
        } catch( Exception ex) {
        }
    }

    @Override
    public void onSuccess(List<T> result) {}
}
Это было полезно?

Решение

I expect that when one external service call fails, i should not get past combinedFuture.get()

Well, no, since you're calling Futures.succcessfulAsList() which, as its name implies, returns the results of the successful Futures (and null for those that failed). For the behaviour you want, you should call Futures.allAsList() which gives you a Future that fails if any of its components fail.

Since you're not checking for nulls in the results, you get the NPE.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top