質問

I am submitting Callables to an ExecutorCompletionService and it seems like the submit() method does not block code while submitting Callables. Here is the code that I have:

    ExecutorService executor = Executors.newFixedThreadPool(30);
    BlockingQueue<Future<Data>> completionQueue = new LinkedBlockingQueue();
    ExecutorCompletionService<Data> completionService = new ExecutorCompletionService<Data>(executor, completionQueue);

    while(receivingPackets) {
        Callable<Data> splitPacketCallable = new SplitPacket(packetString);
        completionService.submit(splitPacketCallable);
        try {
            // Allow submit to finish
            TimeUnit.MILLISECONDS.sleep(50);
        } catch (InterruptedException ex) {
            System.out.println("Something went wrong with sleeping");
        }

        try {
            Future<Data> dataFuture = completionService.poll();
            if (dataFuture != null) {
                Data data = dataFuture.get();
                fileWriter.writeLine(data.toString());
            }

        } catch (InterruptedException ex) {
            System.out.println("Error from poll: " + ex.toString());
        } catch (ExecutionException ex) {
            System.out.println("Error from get: " + ex.toString());
        }
    }

    // Finish any remaining threads
    while (!completionQueue.isEmpty()) {
        try {
            Future<Data> dataFuture = completionService.take();
            Data data = dataFuture.get();
            fileWriter.writeLine(data.toString());
        } catch (InterruptedException ex) {
            System.out.println("Error from take: " + ex.toString());
        } catch (ExecutionException ex) {
            System.out.println("Error from get: " + ex.toString());
        }
    }

    fileWriter.close();
    executor.shutdown();

A few things to note:

Data is a class that stores data in a special format. SplitPacket is a class that implements Callable that takes in a String that has arrived and splits it into chunks to be saved in Data. fileWriter and its method writeLine is a Runnable Class that will asynchronously write to a single file from multiple threads.

If I play with the sleep in the for loop, I start getting inconstant results in my output file. If I sleep for 50 ms every time I submit a Callable, everything works perfectly. However, if I submit with a low value (say 0-5 ms), I start getting dropped threads in the output. To me, this implies that the submit() method of ExecutorCompletionService does not block. However, because blocking a submitted callable seems vital, I also assume I am just implementing this wrong.

In my case, I don't know how many packets will be coming in so I need to be able to continuously add Callables to the Executor. I have tried this with a for loop instead of a while loop so that I can send a given number of packets and see if they get printed on the other end, and I can only get them to go through if I have a delay after submit.

Is there a way to fix this without adding a hack-y delay?

役に立ちましたか?

解決

If you look at the source of ExecutorCompletionService you will see that the Futures are being added to completionQueue after the task is marked as done.

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

You may have an empty queue but still running tasks. The simplest thing you can do is just count the tasks.

int count = 0;
while(receivingPackets) {
    ...
    completionService.submit(splitPacketCallable);
    ++count;
    ...
    try {
        Future<Data> dataFuture = completionService.poll();
        if (dataFuture != null) {
            --count;
            ...
        }
    ...
}

// Finish any remaining threads
while (count-- > 0) {
    ...    
}
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top