Question

Is there a possibility to set priority to tasks which are executed by Executors? I've found some statements in JCIP about it's possible but I cannot find any example and I cannot find anything related in docs.

From JCIP:

An execution policy specifies the "what, where, when, and how" of task execution, including:

  • ...
  • In what order should tasks be executed (FIFO, LIFO, priority order)?
  • ...

UPD: I realized that I asked not exactly what I wanted to ask. What I really wanted is:

How to use/emulate setting threads priority (i.e. what was thread.setPriority()) with executors framework?

Was it helpful?

Solution

Currently the only concrete implementations of the Executor interface are the ThreadPoolExecutor and the ScheduledThreadpoolExecutor

Instead of using the utility / factory class Executors, you should create an instance using a constructor.

You can pass a BlockingQueue to the constructors of the ThreadPoolExecutor.

One of the implementations of the BlockingQueue, the PriorityBlockingQueue lets you pass a Comparator to a constructor, that way enabling you to decide the order of execution.

OTHER TIPS

The idea here is to use a PriorityBlockingQueue in the executor. For this:

  • Create a comparator that would compare our futures.
  • Create a proxy for the Future to hold a priority.
  • Override 'newTaskFor' in order to wrap every future in our proxy.

First you need to hold priority on your future:

    class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }
}

Next you need to define comparator that would correctly sort the priority futures:

class PriorityFutureComparator implements Comparator<Runnable> {
    public int compare(Runnable o1, Runnable o2) {
        if (o1 == null && o2 == null)
            return 0;
        else if (o1 == null)
            return -1;
        else if (o2 == null)
            return 1;
        else {
            int p1 = ((PriorityFuture<?>) o1).getPriority();
            int p2 = ((PriorityFuture<?>) o2).getPriority();

            return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
        }
    }
}

Next let's assume we have a lengthy job like this:

class LenthyJob implements Callable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

Then in order to execute these jobs in priority the code will look like:

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThreads = 2;
        int qInitialSize = 10;

        ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
                return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
            }
        };

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

This is a lot of code but that's nearly the only way this can be accomplished.

On my machine the output is like the following:

Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97

You can implement your own ThreadFactory and set it within ThreadPoolExecutor like this:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));

where my OpJobThreadFactory looks like the following:

public final static class OpJobThreadFactory implements ThreadFactory {
   private int priority;
   private boolean daemon;
   private final String namePrefix;
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
   private final AtomicInteger threadNumber = new AtomicInteger(1);

   public OpJobThreadFactory(int priority) {
      this(priority, true);
   }

   public OpJobThreadFactory(int priority, boolean daemon) {
      this.priority = priority;
      this.daemon = daemon;
      namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
   }

   @Override
   public Thread newThread(Runnable r) {
      Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
      t.setDaemon(daemon);
      t.setPriority(priority);
      return t;
   }
}

you can use ThreadPoolExecutor with Priority blocking queue How to implement PriorityBlockingQueue with ThreadPoolExecutor and custom tasks

You can specify a ThreadFactory in the ThreadPoolExecutor constructor (or Executors factory method). This allows you to provide threads of a given thread priority for the executor.

To get different thread priorities for different jobs, you'd need to send them to executors with different thread factories.

Please be aware that setPriority(..) normally does not work under Linux. See the following links for the full details:

Just want to add my bit of contribution to this discussion. I've implemented this ReorderingThreadPoolExecutor for a very specific purpose, which is being able to explicitly bring to the front of the executor's BlockingQueue (in this case LinkedBlockingDeque) whenever I want and without having to deal with priorities (which can lead to deadlocks and are, anyway, fixed).

I'm using this to manage (inside an Android app) the case where I have to download many images that are displayed in a long list view. Whenever the user scrolls down quickly, the executor queue gets flooded of image download requests: by moving the latest ones on the top of the queue, I have achieved much better performances in loading the images which are actually on the screen, delaying the download of the ones that will probably needed later. Note that I use an internal concurrent map key (which can be as simple as the image URL string) to add the tasks to the executor so that I can retrieve them later for the reordering.

There'd have been many other ways of doing the same and maybe it's overcomplicated, but it works fine and also Facebook in his Android SDK is doing something similar in its own working threads queue.

Feel free to have a look at the code and give me suggestions, it's inside an Android project but stripping a few logs and annotations would make the class pure Java 6.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top