Как реализовать приоритезацию задач с помощью ExecutorService в Java 5?

StackOverflow https://stackoverflow.com/questions/807223

Вопрос

Я реализую механизм объединения потоков, в котором мне хотелось бы выполнять задачи с разными приоритетами.Мне бы хотелось иметь хороший механизм, с помощью которого я мог бы отправлять в службу задачу с высоким приоритетом и планировать ее перед другими задачами.Приоритет задачи является внутренним свойством самой задачи (выражаю ли я эту задачу как Callable или Runnable для меня это не важно).

На первый взгляд кажется, что я мог бы использовать PriorityBlockingQueue как очередь задач в моем ThreadPoolExecutor, но эта очередь содержит Runnable объекты, которые могут быть или не быть Runnable задачи, которые я ему отправил.Более того, если я представил Callable задачи, неясно, как это будет когда-либо отображаться.

Есть ли способ сделать это?Я бы предпочел не делать этого самостоятельно, так как в этом случае у меня гораздо больше шансов ошибиться.

(В сторону;да, я осознаю возможность голода на низкоприоритетных должностях в чем-то вроде этого.Дополнительные баллы (?!) за решения, имеющие разумную гарантию справедливости)

Это было полезно?

Решение

На первый взгляд может показаться, что вы можете определить интерфейс для своих задач, который расширяет Runnable или Callable<T> и Comparable.Затем оберните ThreadPoolExecutor с PriorityBlockingQueue в качестве очереди и принимайте только задачи, реализующие ваш интерфейс.

Принимая во внимание ваш комментарий, похоже, что один из вариантов — продлить ThreadPoolExecutor, и переопределить submit() методы.Ссылаться на AbstractExecutorService посмотреть, как выглядят стандартные;все, что они делают, это оборачивают Runnable или Callable в FutureTask и execute() это.Я бы, вероятно, сделал это, написав класс-оболочку, реализующую ExecutorService и делегирует анонимному внутреннему ThreadPoolExecutor.Оберните их во что-то, что имеет для вас приоритет, чтобы Comparator может добраться до этого.

Другие советы

Я решил эту проблему разумным образом и опишу ее ниже для дальнейшего использования для себя и всех, кто столкнется с этой проблемой с параллельными библиотеками Java.

Используя PriorityBlockingQueue поскольку средство удержания задач для последующего выполнения действительно является движением в правильном направлении.Проблема в том, что PriorityBlockingQueue должен быть создан в общем случае, чтобы содержать Runnable экземпляры, и невозможно вызвать compareTo (или подобное) на Runnable интерфейс.

Приступаем к решению проблемы.При создании Исполнителя ему необходимо задать PriorityBlockingQueue.Очереди также должен быть предоставлен собственный компаратор для правильной сортировки на месте:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Теперь взглянем на CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

До этого момента все выглядело довольно прямолинейно.Здесь становится немного липко.Следующая наша задача — разобраться с созданием FutureTasks из Исполнителя.В Исполнителе мы должны переопределить newTaskFor вот так:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Где c это Callable задача, которую мы пытаемся выполнить.Теперь давайте взглянем на CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Обратите внимание на getTask метод.Мы воспользуемся этим позже, чтобы извлечь из этого исходную задачу. CustomFutureTask что мы создали.

И, наконец, давайте изменим исходную задачу, которую мы пытались выполнить:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

Вы можете видеть, что мы реализуем Comparable в задаче делегировать фактическим Comparator для MyType.

И вот, настраиваемая приоритезация для Исполнителя с использованием библиотек Java!Это требует некоторого изгиба, но это самое чистое, что мне удалось придумать.Надеюсь, это кому-то поможет!

Вы можете использовать эти вспомогательные классы:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

И

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

И этот вспомогательный метод:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

И затем используйте его следующим образом:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Я попытаюсь объяснить эту проблему с помощью полнофункционального кода.Но прежде чем углубиться в код, я хотел бы рассказать о PriorityBlockingQueue.

ПриоритетБлокировкаОчередь :PriorityBlockingQueue — это реализация BlockingQueue.Он принимает задачи вместе с их приоритетом и сначала отправляет на выполнение задачу с наивысшим приоритетом.Если какие-либо две задачи имеют одинаковый приоритет, нам нужно предоставить специальную логику, чтобы решить, какая задача будет первой.

Теперь давайте сразу перейдем к коду.

Класс водителя :Этот класс создает исполнителя, который принимает задачи и позже отправляет их на выполнение.Здесь мы создаем две задачи: одну с НИЗКИМ приоритетом, а другую с ВЫСОКИМ приоритетом.Здесь мы говорим исполнителю запускать МАКСИМАЛЬНО 1 поток и использовать PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

Класс MyTask :MyTask реализует Runnable и принимает приоритет в качестве аргумента конструктора.При выполнении этой задачи она печатает сообщение, а затем переводит поток в спящий режим на 1 секунду.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

Класс MyFutureTask :Поскольку мы используем PriorityBlocingQueue для хранения наших задач, наши задачи должны быть обернуты внутри FutureTask, а наша реализация FutureTask должна реализовывать интерфейс Comparable.Интерфейс Comparable сравнивает приоритет двух разных задач и отправляет на выполнение задачу с наивысшим приоритетом.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Приоритетный класс :Самоочевидный класс приоритета.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Теперь, когда мы запустим этот пример, мы получим следующий вывод

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Несмотря на то, что мы сначала отправили задачу с НИЗКИМ приоритетом, а затем с ВЫСОКИМ приоритетом, но поскольку мы используем PriorityBlockingQueue, любая задача с более высоким приоритетом будет выполняться первой.

Мое решение сохраняет порядок отправки задач с одинаковыми приоритетами.Это улучшение этого отвечать

Порядок выполнения задачи основан на:

  1. Приоритет
  2. Отправить заказ (в пределах того же приоритета)

Класс тестера:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Результат:

А1 Б1 В2 В3 В4 В5 А2 А3 А4 А5 А6 А7 А8

Первой задачей является A1, поскольку на момент ее вставки в очереди не было более высокого приоритета.Задачи B имеют приоритет 1, поэтому выполняются раньше, задачи A имеют приоритет 0, поэтому выполняются позже, но порядок выполнения соответствует порядку отправки:Б1, Б2, Б3, ...А2, А3, А4...

Решение:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

Можно ли было бы иметь один ThreadPoolExecutor для каждого уровня приоритета?А ThreadPoolExecutor может быть создан с помощью ThreadFactory, и вы можете иметь свою собственную реализацию ThreadFactory для установки различных уровней приоритета.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top