Question

So I think I sort of understand how fixed thread pools work (using the Executor.fixedThreadPool built into Java), but from what I can see, there's usually a set number of jobs you want done and you know how many to when you start the program. For example

int numWorkers = Integer.parseInt(args[0]);
int threadPoolSize = Integer.parseInt(args[1]);
ExecutorService tpes =
    Executors.newFixedThreadPool(threadPoolSize);
WorkerThread[] workers = new WorkerThread[numWorkers];
for (int i = 0; i < numWorkers; i++) {
    workers[i] = new WorkerThread(i);
    tpes.execute(workers[i]);
}

Where each workerThread does something really simple,that part is arbitrary. What I want to know is, what if you have a fixed pool size (say 8 max) but you don't know how many workers you'll need to finish the task until runtime.

The specific example is: If I have a pool size of 8 and I'm reading from standard input. As I read, I split the input into blocks of a set size. Each one of these blocks is given to a thread (along with some other information) so that they can compress it. As such, I don't know how many threads I'll need to create as I need to keep going until I reach the end of the input. I also have to somehow ensure that the data stays in the same order. If thread 2 finishes before thread 1 and just submits its work, my data will be out of order!

Would a thread pool be the wrong approach in this situation then? It seems like it'd be great (since I can't use more than 8 threads at a time).

Basically, I want to do something like this:

ExecutorService tpes = Executors.newFixedThreadPool(threadPoolSize);
BufferedInputStream inBytes = new BufferedInputStream(System.in);
byte[] buff = new byte[BLOCK_SIZE];
byte[] dict = new byte[DICT_SIZE];
WorkerThread worker;
int bytesRead = 0;

while((bytesRead = inBytes.read(buff)) != -1) {
   System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
   worker = new WorkerThread(buff, dict)   
   tpes.execute(worker);
}

This is not working code, I know, but I'm just trying to illustrate what I want.

I left out a bit, but see how buff and dict have changing values and that I don't know how long the input is. I don't think I can't actually do this thought because, well worker already exists after the first call! I can't just say worker = new WorkerThread a bunch of time since isn't it already pointing towards an existing thread (true, a thread that might be dead) and obviously in this implemenation if it did work I wouldn't be running in parallel. But my point is, I want to keep creating threads until I hit the max pool size, wait till a thread is done, then keep creating threads until I hit the end of the input.

I also need to keep stuff in order, which is the part that's really annoying.

Was it helpful?

Solution

Your solution is completely fine (the only point is that parallelism is perhaps not necessary if the workload of your WorkerThreads is very small).

With a thread pool, the number of submitted tasks is not relevant. There may be less or more than the number of threads in the pool, the thread pool takes care of that.

However, and this is important: You rely on some kind of order of the results of your WorkerThreads, but when using parallelism, this order is not guaranteed! It doesn't matter whether you use a thread pool, or how much worker threads you have, etc., it will always be possible that your results will be finished in an arbitrary order!

To keep the order right, give each WorkerThread the number of the current item in its constructor, and let them put their results in the right order after they are finished:

int noOfWorkItem = 0;
while((bytesRead = inBytes.read(buff)) != -1) {
   System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
   worker = new WorkerThread(buff, dict, noOfWorkItem++)   
   tpes.execute(worker);
}

OTHER TIPS

As @ignis points out, parallel execution may not be the best answer for your situation. However, to answer the more general question, there are several other Executor implementations to consider beyond FixedThreadPool, some of which may have the characteristics that you desire.

As far as keeping things in order, typically you would submit tasks to the executor, and for each submission, you get a Future (which is an object that promises to give you a result later, when the task finishes). So, you can keep track of the Futures in the order that you submitted tasks, and then when all tasks are done, invoke get() on each Future in order, to get the results.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top