Pergunta

Estou implementando um mecanismo de rosca pooling em que eu gostaria de executar tarefas de prioridades diferentes. Eu gostaria de ter um mecanismo legal através do qual eu posso submeter uma tarefa de alta prioridade para o serviço e tê-lo ser agendada antes de outras tarefas. A prioridade da tarefa é uma propriedade intrínseca da própria tarefa (se eu expressar essa tarefa como um Callable ou um Runnable não é importante para mim).

Agora, superficialmente parece que eu poderia usar um PriorityBlockingQueue como a fila de tarefas em minha ThreadPoolExecutor, mas essa fila contém objetos Runnable, que podem ou não ser as tarefas Runnable eu submetidos. Além disso, se eu tiver enviado tarefas Callable, não está claro como isso jamais iria mapear.

Existe uma maneira de fazer isso? Eu realmente prefiro não fazer a minha própria para isso, desde que eu sou muito mais propensos a errar dessa forma.

(Um aparte;.? Sim, estou ciente da possibilidade de fome para trabalhos de menor prioridade em algo como isso Pontos extras () para soluções que têm uma garantia razoável de justiça)

Foi útil?

Solução

À primeira vista, parece que você poderia definir uma interface para as tarefas que se estende Runnable ou Callable<T> e Comparable. Em seguida, enrole uma ThreadPoolExecutor com um PriorityBlockingQueue como a fila, e só aceitam tarefas que implementam a sua interface.

Tomar o seu comentário em conta, parece que uma opção é estender ThreadPoolExecutor, e substituir os métodos submit(). Consulte AbstractExecutorService para ver o que os default parecer; todos eles fazem é envolver o Runnable ou Callable em um FutureTask e execute()-lo. Eu provavelmente fazer isso escrevendo uma classe wrapper que implementa ExecutorService e delegados a um ThreadPoolExecutor interna anônima. Envolvê-los em algo que tem a sua prioridade, de modo que seu Comparator pode chegar a ele.

Outras dicas

Eu ter resolvido este problema de uma forma razoável, e eu vou descrever abaixo para referência futura para mim e qualquer outra pessoa que corre para este problema com as bibliotecas Java simultâneas.

Usando um PriorityBlockingQueue como os meios para agarrar tarefas para execução posterior é de fato um movimento na direção correta. O problema é que o PriorityBlockingQueue deve ser genericamente instanciado para conter casos Runnable, e é impossível compareTo chamada (ou similar) em uma interface Runnable.

Onto resolver o problema. Ao criar o Executor, deve ser dado um PriorityBlockingQueue. A fila deve ainda ser dado um comparador personalizado para fazer adequado no local de classificação:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Agora, uma espiada no CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Tudo olhando muito para a frente até este ponto. Ela recebe um pegajoso pouco aqui. Nosso próximo problema é lidar com a criação de FutureTasks do Executor. No Executor, devemos substituir newTaskFor assim:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Onde c é a tarefa Callable que estamos tentando executar. Agora, vamos dar uma olhada em CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Observe o método getTask. Vamos usar isso mais tarde para pegar a tarefa original deste CustomFutureTask que nós criamos.

E, finalmente, vamos modificar a tarefa original que estávamos tentando executar:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

Você pode ver que vamos implementar Comparable na tarefa de delegar ao Comparator real para MyType.

E aí está, priorização personalizado para um Executor utilizando as bibliotecas Java! Leva algum pouco de flexão, mas é a mais limpa que eu tenho sido capaz de chegar a. Espero que este seja útil para alguém!

Você pode usar essas classes auxiliares:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

E

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

E este método auxiliar:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

E , em seguida, usá-lo como este:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Vou tentar explicar esse problema com um código totalmente funcional. Mas antes de mergulhar no código que eu gostaria de explicar sobre PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue é uma implementação do BlockingQueue. Ele aceita as tarefas junto com sua prioridade e envia a tarefa com a mais alta prioridade para a execução em primeiro lugar. Se quaisquer duas tarefas têm a mesma prioridade, então nós precisamos fornecer alguma lógica personalizada para decidir qual tarefa vai primeiro.

Agora vamos entrar na reta código.

classe driver : Esta classe cria um executor que aceita tarefas e apresente posteriormente los para execução. Aqui criamos duas tarefas um com prioridade baixa e outro com alta prioridade. Aqui nós dizemos o executor para executar um máximo de 1 tópicos e usar o PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

classe MyTask : implementos MyTask Runnable e aceita prioridade como um argumento no construtor. Quando essa tarefa é executada, ele imprime uma mensagem e, em seguida, coloca o fio para dormir durante 1 segundo.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

classe MyFutureTask : Como estamos usando PriorityBlocingQueue para a realização de nossas tarefas, nossas tarefas devem ser acondicionados dentro FutureTask e nossa implementação de FutureTask deve implementar a interface Comparable. A interface Comparável compara a prioridade de 2 tarefas e apresenta diferentes a tarefa com a mais alta prioridade para a execução.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

classe de prioridade : Auto classe de prioridade explicativa.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Agora, quando executar este exemplo, temos o seguinte resultado

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Mesmo que apresentou a baixa prioridade primeira, mas tarefa de prioridade ALTA mais tarde, mas já que estamos usando um PriorityBlockingQueue, qualquer tarefa com uma prioridade mais alta será executado em primeiro lugar.

A minha solução preserva a ordem de submissão de tarefas para mesmas prioridades. É uma melhoria da resposta deste

ordem de execução de tarefas é baseado em:

  1. Prioridade
  2. Enviar pedido (dentro de mesma prioridade)

class Tester:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Resultado:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

A primeira tarefa é A1 porque não havia mais elevada prioridade na fila, quando ele foi inserido. tarefas B são 1 prioridade para executados anteriormente, tarefas A são 0 prioridade tão executado mais tarde, mas a ordem de execução é segue a ordem de submissão: B1, B2, B3, ... A2, A3, A4 ...

A solução:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

Seria possível ter um ThreadPoolExecutor para cada nível de prioridade? A ThreadPoolExecutor pode ser instanciado com um ThreadFactory e você poderia ter sua própria implementação de um ThreadFactory para definir os diferentes níveis de prioridade.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top