Вопрос

There's something odd about the implementation of the BoundedExecutor in the book Java Concurrency in Practice.

It's supposed to throttle task submission to the Executor by blocking the submitting thread when there are enough threads either queued or running in the Executor.

This is the implementation (after adding the missing rethrow in the catch clause):

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command) throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();

        try {
            exec.execute(new Runnable() {
                @Override public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }

When I instantiate the BoundedExecutor with an Executors.newCachedThreadPool() and a bound of 4, I would expect the number of threads instantiated by the cached thread pool to never exceed 4. In practice, however, it does. I've gotten this little test program to create as much as 11 threads:

public static void main(String[] args) throws Exception {
    class CountingThreadFactory implements ThreadFactory {
        int count;

        @Override public Thread newThread(Runnable r) {
            ++count;
            return new Thread(r);
        }           
    }

    List<Integer> counts = new ArrayList<Integer>();

    for (int n = 0; n < 100; ++n) {
        CountingThreadFactory countingThreadFactory = new CountingThreadFactory();
        ExecutorService exec = Executors.newCachedThreadPool(countingThreadFactory);

        try {
            BoundedExecutor be = new BoundedExecutor(exec, 4);

            for (int i = 0; i < 20000; ++i) {
                be.submitTask(new Runnable() {
                    @Override public void run() {}
                });
            }
        } finally {
            exec.shutdown();
        }

        counts.add(countingThreadFactory.count);
    }

    System.out.println(Collections.max(counts));
}

I think there's a tiny little time frame between the release of the semaphore and the task ending, where another thread can aquire a permit and submit a task while the releasing thread hasn't finished yet. In other words, it has a race condition.

Can someone confirm this?

Это было полезно?

Решение

I see as much as 9 threads created at once. I suspect there is a race condition which causes there to be more thread than required.

This could be because there is before and after running the task work to be done. This means that even though there is only 4 thread inside your block of code, there is a number of thread stopping a previous task or getting ready to start a new task.

i.e. the thread does a release() while it is still running. Even though its the last thing you do its not the last thing it does before acquiring a new task.

Другие советы

BoundedExecutor was indeed intended as an illustration of how to throttle task submission, not as a way to place a bound on thread pool size. There are more direct ways to achieve the latter, as at least one comment pointed out.

But the other answers don't mention the text in the book that says to use an unbounded queue and to

set the bound on the semaphore to be equal to the pool size plus the number of queued tasks you want to allow, since the semaphore is bounding the number of tasks both currently executing and awaiting execution. [JCiP, end of section 8.3.3]

By mentioning unbounded queues and pool size, we were implying (apparently not very clearly) the use of a thread pool of bounded size.

What has always bothered me about BoundedExecutor, however, is that it doesn't implement the ExecutorService interface. A modern way to achieve similar functionality and still implement the standard interfaces would be to use Guava's listeningDecorator method and ForwardingListeningExecutorService class.

You are correct in your analysis of the race condition. There is no synchronization guarantees between the ExecutorService & the Semaphore.

However, I do not know if throttling the number of threads is what the BoundedExecutor is used for. I think it is more for throttling the number of tasks submitted to the service. Imagine if you have 5 million tasks that need to submit, and if you submit more then 10,000 of them you run out of memory.

Well you only will ever have 4 threads running at any given time, why would you want to try and queue up all 5 millions tasks? You can use a construct similar to this to throttle the number of tasks queued up at any given time. What you should get out of this is that at any given time there are only 4 tasks running.

Obviously the resolution to this is to use a Executors.newFixedThreadPool(4).

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top