Pregunta

Estoy implementando un mecanismo de agrupación de subprocesos en el que me gustaría ejecutar tareas de diferentes prioridades.Me gustaría tener un buen mecanismo mediante el cual pueda enviar una tarea de alta prioridad al servicio y programarla antes que otras tareas.La prioridad de la tarea es una propiedad intrínseca de la tarea misma (ya sea que exprese esa tarea como una Callable o un Runnable no es importante para mí).

Ahora, superficialmente parece que me vendría bien un PriorityBlockingQueue como la cola de tareas en mi ThreadPoolExecutor, pero esa cola contiene Runnable objetos, que pueden o no ser el Runnable tareas que le he enviado.Además, si he enviado Callable tareas, no está claro cómo se mapearía esto.

¿Hay alguna forma de hacer esto?Realmente preferiría no hacer esto, ya que es mucho más probable que me equivoque de esa manera.

(Un aparte;Sí, soy consciente de la posibilidad de que se pierdan trabajos de menor prioridad en algo como esto.Puntos extra (?!) por soluciones que tengan una garantía razonable de equidad)

¿Fue útil?

Solución

A primera vista, parecería que podrías definir una interfaz para tus tareas que se extienda Runnable o Callable<T> y Comparable.Luego envuelve un ThreadPoolExecutor con un PriorityBlockingQueue como cola y solo acepte tareas que implementen su interfaz.

Teniendo en cuenta tu comentario, parece que una opción es ampliar ThreadPoolExecutor, y anular el submit() métodos.Referirse a AbstractExecutorService para ver cómo son los predeterminados;lo único que hacen es envolver el Runnable o Callable en un FutureTask y execute() él.Probablemente haría esto escribiendo una clase contenedora que implemente ExecutorService y delega a un interno anónimo ThreadPoolExecutor.Envuélvelos en algo que tenga tu prioridad, para que tu Comparator puede llegar a ello.

Otros consejos

He resuelto este problema de manera razonable, y lo describiré a continuación para referencia futura para mí y cualquier otra persona que tenga este problema con las bibliotecas concurrentes de Java.

Usar un PriorityBlockingQueue como medio para aferrarse a las tareas para su posterior ejecución es, de hecho, un movimiento en la dirección correcta. El problema es que Runnable se debe crear una instancia genérica para contener compareTo instancias, y es imposible llamar a CustomTaskComparator (o similar) en una interfaz newTaskFor.

Para resolver el problema. Al crear el Ejecutor, se le debe dar un c. Además, la cola debe recibir un Comparador personalizado para hacer una clasificación adecuada en el lugar:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Ahora, un vistazo a Callable:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Todo parece bastante sencillo hasta este punto. Se pone un poco pegajoso aquí. Nuestro siguiente problema es lidiar con la creación de FutureTasks del Ejecutor. En el Ejecutor, debemos anular CustomFutureTask como tal:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Donde getTask es la tarea Comparable que estamos tratando de ejecutar. Ahora, echemos un vistazo a Comparator:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Observe el método MyType. Lo usaremos más tarde para obtener la tarea original de este <=> que hemos creado.

Y finalmente, modifiquemos la tarea original que estábamos tratando de ejecutar:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

Puede ver que implementamos <=> en la tarea para delegar en el <=> real para <=>.

¡Y ahí lo tiene, priorización personalizada para un ejecutor que usa las bibliotecas Java! Se necesita un poco de flexión, pero es lo más limpio que he podido encontrar. ¡Espero que esto sea útil para alguien!

Puede usar estas clases auxiliares:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

AND

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

Y este método auxiliar:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

Y luego utilícelo así:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Intentaré explicar este problema con un código completamente funcional. Pero antes de sumergirme en el código, me gustaría explicar sobre PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue es una implementación de BlockingQueue. Acepta las tareas junto con su prioridad y envía primero la tarea con la prioridad más alta para su ejecución. Si dos tareas tienen la misma prioridad, entonces necesitamos proporcionar una lógica personalizada para decidir qué tarea va primero.

Ahora ingresemos al código inmediatamente.

Clase de controlador : esta clase crea un ejecutor que acepta tareas y luego las envía para su ejecución. Aquí creamos dos tareas, una con prioridad BAJA y la otra con prioridad ALTA. Aquí le decimos al ejecutor que ejecute un MAX de 1 subprocesos y use PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

Clase MyTask : MyTask implementa Runnable y acepta la prioridad como argumento en el constructor. Cuando se ejecuta esta tarea, imprime un mensaje y luego pone el hilo en suspensión durante 1 segundo.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

Clase MyFutureTask : Dado que estamos utilizando PriorityBlocingQueue para mantener nuestras tareas, nuestras tareas deben estar incluidas dentro de FutureTask y nuestra implementación de FutureTask debe implementar una interfaz comparable. La interfaz comparable compara la prioridad de 2 tareas diferentes y envía la tarea con la máxima prioridad para la ejecución.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Clase de prioridad : Clase de prioridad autoexplicativa.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Ahora, cuando ejecutamos este ejemplo, obtenemos el siguiente resultado

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Aunque enviamos la prioridad BAJA primero, pero la tarea ALTA prioridad más tarde, pero dado que estamos usando PriorityBlockingQueue, cualquier tarea con una prioridad más alta se ejecutará primero.

Mi solución conserva el orden de envío de tareas para las mismas prioridades. Es una mejora de esta respuesta

El orden de ejecución de la tarea se basa en:

  1. Prioridad
  2. Enviar pedido (con la misma prioridad)

Clase de probador:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Resultado :

  

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

     

La primera tarea es A1 porque no había mayor prioridad en la cola cuando se insertó. Las tareas B tienen 1 prioridad, así que se ejecutaron antes, las tareas A tienen 0 prioridad, así que se ejecutaron después, pero el orden de ejecución es el siguiente: B1, B2, B3, ... A2, A3, A4 ...

La solución:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

¿Sería posible tener un ThreadPoolExecutor para cada nivel de prioridad? Un ThreadPoolExecutor puede ser instanciado con un ThreadFactory y usted podría tener su propia implementación de un ThreadFactory para establecer los diferentes niveles de prioridad.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top