Question

I've got some code that submits a request to another thread which may or may not submit that request to yet another thread. That yields a return type of Future<Future<T>>. Is there some non-heinous way to immediately turn this into Future<T> that waits on the completion of the entire future chain?

I'm already using the Guava library to handle other fun concurrency stuff and as a replacement for Google Collections and its working well but I can't seem to find something for this case.

Was it helpful?

Solution

Guava 13.0 adds Futures.dereference to do this. It requires a ListenableFuture<ListenableFuture>, rather than a plain Future<Future>. (Operating on a plain Future would require a makeListenable call, each of which requires a dedicated thread for the lifetime of the task (as is made clearer by the method's new name, JdkFutureAdapters.listenInPoolThread).)

OTHER TIPS

Another possible implementation that uses the guava libraries and is a lot simpler.

import java.util.concurrent.*;
import com.google.common.util.concurrent.*;
import com.google.common.base.*;

public class FFutures {
  public <T> Future<T> flatten(Future<Future<T>> future) {
    return Futures.chain(Futures.makeListenable(future), new Function<Future<T>, ListenableFuture<T>>() {
      public ListenableFuture<T> apply(Future<T> f) {
        return Futures.makeListenable(f);
      }
    });
  }
}

I think this is the best that can be done to implement the contract of Future. I took the tack of being as unclever as possible so as to be sure that it meets the contract. Not especially the implementation of get with timeout.

import java.util.concurrent.*;

public class Futures {
  public <T> Future<T> flatten(Future<Future<T>> future) {
    return new FlattenedFuture<T>(future);
  }

  private static class FlattenedFuture<T> implements Future<T> {
    private final Future<Future<T>> future;

    public FlattenedFuture(Future<Future<T>> future) {
      this.future = future;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
      if (!future.isDone()) {
        return future.cancel(mayInterruptIfRunning);
      } else {
        while (true) {
          try {
            return future.get().cancel(mayInterruptIfRunning);
          } catch (CancellationException ce) {
            return true;
          } catch (ExecutionException ee) {
            return false;
          } catch (InterruptedException ie) {
            // pass
          }
        }
      }
    }

    public T get() throws InterruptedException, 
                          CancellationException, 
                          ExecutionException 
    {
      return future.get().get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
                                                     CancellationException, 
                                                     ExecutionException, 
                                                     TimeoutException 
    {
      if (future.isDone()) {
        return future.get().get(timeout, unit);
      } else {
        return future.get(timeout, unit).get(0, TimeUnit.SECONDS);
      }
    }

    public boolean isCancelled() {
      while (true) {
        try {
          return future.isCancelled() || future.get().isCancelled();
        } catch (CancellationException ce) {
          return true;
        } catch (ExecutionException ee) {
          return false;
        } catch (InterruptedException ie) {
          // pass
        }
      }
    }

    public boolean isDone() {
      return future.isDone() && innerIsDone();
    }

    private boolean innerIsDone() {
      while (true) {
        try {
          return future.get().isDone();
        } catch (CancellationException ce) {
          return true;
        } catch (ExecutionException ee) {
          return true;
        } catch (InterruptedException ie) {
          // pass
        }
      }
    }
  }
}

You could create a class like:

public class UnwrapFuture<T> implements Future<T> {
    Future<Future<T>> wrappedFuture;

    public UnwrapFuture(Future<Future<T>> wrappedFuture) {
        this.wrappedFuture = wrappedFuture;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        try {
            return wrappedFuture.get().cancel(mayInterruptIfRunning);
        } catch (InterruptedException e) {
            //todo: do something
        } catch (ExecutionException e) {
            //todo: do something
        }
    }
    ...
}

You'll have to deal with exceptions that get() can raise but other methods cannot.

This was my first stab at it but I'm sure there is plenty wrong with it. I'd be more than happy to just replace it with something like Futures.compress(f).

public class CompressedFuture<T> implements Future<T> {
    private final Future<Future<T>> delegate;

    public CompressedFuture(Future<Future<T>> delegate) {
        this.delegate = delegate;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (delegate.isDone()) {
            return delegate.cancel(mayInterruptIfRunning);
        }
        try {
            return delegate.get().cancel(mayInterruptIfRunning);
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return delegate.get().get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        long endTime = System.currentTimeMillis() + unit.toMillis(timeout);
        Future<T> next = delegate.get(timeout, unit);
        return next.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public boolean isCancelled() {
        if (!delegate.isDone()) {
            return delegate.isCancelled();
        }
        try {
            return delegate.get().isCancelled();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }

    @Override
    public boolean isDone() {
        if (!delegate.isDone()) {
            return false;
        }
        try {
            return delegate.get().isDone();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top