Question

I have a Java iterator that lists items from a remote location. The item listing comes in "pages", and the "get next page" operation is rather slow. (To be concrete, my iterator is called S3Find and lists objects from Amazon S3).

So, to speed things up I wanted to prefetch one listing page. To do this I used an ExecutorService and a Callable/Future pattern to prefetch "pages" of items. The problem is, the caller of that iterator might just abandon the operation at any time, without informing my class. For example, consider the following loop:

for (S3URL f : new S3Find(topdir).withRecurse(true)) {
    // do something with f
    if (some_condition) break;
}

As a result, a have a resource leak, as the ExecutorService that I use to submit the Callable is left alive and running even if there are no more reference to the containing S3Find (and even though the next prefetch has been completed).

What's the proper way to handle this? Am I using the wrong approach? Should I just abandon ExecutorService and use a new bare thread for every prefetch (and kill the thread when the prefetch is done)? Note that each fetch of a page takes roughly 500ms, so creating a new thread every time is probably negligible in comparison. One thing I do not want is to require callers to explicitly inform S3Find that they are done iterating (as it will for sure be forgotten by some).

Here is the current prefetching code (inside S3Find):

/**
 * This class holds one ObjectListing (one "page"), and also pre-fetches
 * the next page using a {@link S3Find#NextPageGetter} Callable on a
 * separate thread.
 */
private static class Pager {
    private final AmazonS3 s3;
    private ObjectListing currentList;
    private Future<ObjectListing> future;
    private final ExecutorService exec;
    public Pager(AmazonS3 s3, ListObjectsRequest request) {
        this.s3 = s3;
        currentList = s3.listObjects(request);
        exec = Executors.newSingleThreadExecutor();
        future = submitPrefetch();
    }
    public ObjectListing getCurrentPage() {
        return currentList;
    }
    /**
     * Move currentList to the next page, and returns it.
     */
    public ObjectListing getNextPage() {
        if (future == null) return null;
        try {
            currentList = future.get();
            future = submitPrefetch();
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }
        return currentList;
    }
    private Future<ObjectListing> submitPrefetch() {
        if (currentList == null || !currentList.isTruncated()) {
            exec.shutdown();
            return null;
        } else {
            NextPageGetter worker = new NextPageGetter(s3, currentList);
            return exec.submit(worker);
        }
    }
}

/**
 * This class retrieves the "next page" of a truncated ObjectListing.
 * It is meant to be called in a Callable/Future pattern.
 */
private static class NextPageGetter implements Callable<ObjectListing> {
    private final ObjectListing currentList;
    private final AmazonS3 s3;

    public NextPageGetter(AmazonS3 s3, ObjectListing currentList) {
        super();
        this.s3 = s3;
        this.currentList = currentList;
        if (currentList == null || !currentList.isTruncated()) {
            throw new IllegalArgumentException(currentList==null ?
                        "null List" : "List is not truncated");
        }
    }

    @Override
    public ObjectListing call() throws Exception {
        ObjectListing nextList = s3.listNextBatchOfObjects(currentList);
        return nextList;
    }
}
Was it helpful?

Solution

This is a classic problem that I've run into a couple of times. Happens to me with database connections.

Should I just abandon ExecutorService and use a new bare thread for every prefetch (and kill the thread when the prefetch is done)?

I guess that's your only option. I wouldn't bother killing the thread. Just have it finish its job and die in the background. Fork a new thread for the next page. You'll need to join with the thread and use some sort of common AtomicReference (or something) to share the result list between the S3Find caller and the thread.

One thing I do not want is to require callers to explicitly inform S3Find that they are done iterating (as it will for sure be forgotten by some).

I don't see any easy way to do this "right" without the caller calling some sort of close() method in a try/finally. Can't you be explicit about that in the Javadocs somehow? That's what I've done in my ORMLite database iterators.

S3Find s3Find = new S3Find(topdir).withRecurse(true);
try {
    for (S3URL f : s3Find) {
        ...
    }
} finally {
    s3Find.close();
}

Then in S3Find.close():

public void close() {
    exec.shutdown();
}

In Java 7 they've added the try with resources construct which the language auto-closes any Closeable resources. That's a big win.

OTHER TIPS

I think I have now a solution that, while using a bare thread as discussed above, is quite simple and very close to the initial version. It still leverages the nice Callable pattern, but uses a FutureTask instead of a Future, and no ExecutorService at all.

The key thing that I had missed before is that FutureTask extends Runnable and you actually can launch it via new Thread(task). In other words:

NextPageGetter worker = new NextPageGetter(s3, currentList);
FutureTask<ObjectListing> future = new FutureTask<>(worker);
new Thread(future).start();

and then later:

currentList = future.get();

Now all the resources are happily disposed, whether the iterator is exhausted or not. In fact, the thread disappears as soon as the FutureTask has completed.

For sake of completeness, here is the modified code (only class Pager has changed):

/**
 * This class holds one ObjectListing (one "page"), and also pre-fetches the next page
 * using a {@link S3Find#NextPageGetter} Callable on a separate thread.
 */
private static class Pager {
    private final AmazonS3 s3;
    private ObjectListing currentList;
    private FutureTask<ObjectListing> future;
    public Pager(AmazonS3 s3, ListObjectsRequest request) {
        this.s3 = s3;
        currentList = s3.listObjects(request);
        future = submitPrefetch();
    }
    public ObjectListing getCurrentPage() {
        return currentList;
    }
    /**
     * Move currentList to the next page, and returns it.
     */
    public ObjectListing getNextPage() {
        if (future == null) return null;
        try {
            currentList = future.get();
            future = submitPrefetch();
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
        }
        return currentList;
    }
    private FutureTask<ObjectListing> submitPrefetch() {
        if (currentList == null || !currentList.isTruncated()) {
            return null;
        } else {
            NextPageGetter worker = new NextPageGetter(s3, currentList);
            FutureTask<ObjectListing> f = new FutureTask<>(worker);
            new Thread(f).start();
            return f;
        }
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top