Question

I am submitting Callables to an ExecutorCompletionService and it seems like the submit() method does not block code while submitting Callables. Here is the code that I have:

    ExecutorService executor = Executors.newFixedThreadPool(30);
    BlockingQueue<Future<Data>> completionQueue = new LinkedBlockingQueue();
    ExecutorCompletionService<Data> completionService = new ExecutorCompletionService<Data>(executor, completionQueue);

    while(receivingPackets) {
        Callable<Data> splitPacketCallable = new SplitPacket(packetString);
        completionService.submit(splitPacketCallable);
        try {
            // Allow submit to finish
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (InterruptedException ex) {
            System.out.println("Something went wrong with sleeping");
        }

        try {
            Future<Data> dataFuture = completionService.poll();
            if (dataFuture != null) {
                Data data = dataFuture.get();
                fileWriter.writeLine(data.toString());
            }

        } catch (InterruptedException ex) {
            System.out.println("Error from poll: " + ex.toString());
        } catch (ExecutionException ex) {
            System.out.println("Error from get: " + ex.toString());
        }
    }

    // Finish any remaining threads
    while (!completionQueue.isEmpty()) {
        try {
            Future<Data> dataFuture = completionService.take();
            Data data = dataFuture.get();
            fileWriter.writeLine(data.toString());
        } catch (InterruptedException ex) {
            System.out.println("Error from take: " + ex.toString());
        } catch (ExecutionException ex) {
            System.out.println("Error from get: " + ex.toString());
        }
    }

    fileWriter.close();
    executor.shutdown();

A few things to note:

Data is a class that stores data in a special format. SplitPacket is a class that implements Callable that takes in a String that has arrived and splits it into chunks to be saved in Data. fileWriter and its method writeLine is a Runnable Class that will asynchronously write to a single file from multiple threads.

If I play with the sleep in the for loop, I start getting inconstant results in my output file. If I sleep for 50 ms every time I submit a Callable, everything works perfectly. However, if I submit with a low value (say 0-5 ms), I start getting dropped threads in the output. To me, this implies that the submit() method of ExecutorCompletionService does not block. However, because blocking a submitted callable seems vital, I also assume I am just implementing this wrong.

In my case, I don't know how many packets will be coming in so I need to be able to continuously add Callables to the Executor. I have tried this with a for loop instead of a while loop so that I can send a given number of packets and see if they get printed on the other end, and I can only get them to go through if I have a delay after submit.

Is there a way to fix this without adding a hack-y delay?

Was it helpful?

Solution

If you look at the source of ExecutorCompletionService you will see that the Futures are being added to completionQueue after the task is marked as done.

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

You may have an empty queue but still running tasks. The simplest thing you can do is just count the tasks.

int count = 0;
while(receivingPackets) {
    ...
    completionService.submit(splitPacketCallable);
    ++count;
    ...
    try {
        Future<Data> dataFuture = completionService.poll();
        if (dataFuture != null) {
            --count;
            ...
        }
    ...
}

// Finish any remaining threads
while (count-- > 0) {
    ...    
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top