Come implementare la definizione delle priorità delle attività utilizzando un ExecutorService in Java 5?

StackOverflow https://stackoverflow.com/questions/807223

Domanda

Sto implementando un meccanismo di pool di thread in cui mi piacerebbe eseguire attività con priorità diverse. Mi piacerebbe avere un bel meccanismo in base al quale posso inviare un'attività ad alta priorità al servizio e programmarla prima di altre attività. La priorità dell'attività è una proprietà intrinseca dell'attività stessa (se esprimo tale attività come Callable o Runnable non è importante per me).

Ora, superficialmente sembra che potrei usare un PriorityBlockingQueue come la coda delle attività nel mio ThreadPoolExecutor, ma quella coda contiene <=> oggetti, che possono essere o meno le <=> attività che ho inviato ad esso. Inoltre, se ho inviato <=> compiti, non è chiaro come questo possa mai essere mappato.

C'è un modo per farlo? Preferirei davvero non farmi il mio per questo, dal momento che sono molto più propenso a sbagliare in quel modo.

(A parte; sì, sono consapevole della possibilità di morire di fame per lavori a priorità inferiore in qualcosa del genere. Punti extra (?!) per soluzioni che hanno una ragionevole garanzia di equità)

È stato utile?

Soluzione

A prima vista sembrerebbe che tu possa definire un'interfaccia per le tue attività che si estende Runnable o Callable<T> e Comparable. Quindi avvolgi un ThreadPoolExecutor con un PriorityBlockingQueue come coda e accetta solo attività che implementano la tua interfaccia.

Tenendo conto del tuo commento, sembra che un'opzione sia estendere submit() e sovrascrivere i metodi AbstractExecutorService. Fare riferimento a Callable per vedere come appaiono quelli predefiniti; tutto ciò che fanno è avvolgere FutureTask o execute() in ExecutorService e Comparator esso. Probabilmente lo farei scrivendo una classe wrapper che implementa <=> e delega a un interno anonimo <=>. Avvolgili in qualcosa che abbia la tua priorità, in modo che il tuo <=> possa raggiungerlo.

Altri suggerimenti

Ho risolto questo problema in modo ragionevole e lo descriverò di seguito per riferimento futuro a me stesso e a chiunque altro che si imbatte in questo problema con le librerie concorrenti di Java.

Usare un PriorityBlockingQueue come mezzo per trattenere compiti per l'esecuzione successiva è davvero un movimento nella direzione corretta. Il problema è che Runnable deve essere genericamente istanziato per contenere compareTo istanze ed è impossibile chiamare CustomTaskComparator (o simile) su un'interfaccia newTaskFor.

Per risolvere il problema. Quando si crea Executor, deve essere assegnato un c. La coda dovrebbe inoltre essere dotata di un comparatore personalizzato per eseguire correttamente l'ordinamento sul posto:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Ora, una sbirciatina a Callable:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Tutto sembra piuttosto semplice fino a questo punto. Diventa un po 'appiccicoso qui. Il nostro prossimo problema è occuparci della creazione di FutureTasks dall'esecutore. Nell'Executor, dobbiamo sovrascrivere CustomFutureTask in questo modo:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Dove getTask è l'attività Comparable che stiamo tentando di eseguire. Ora diamo un'occhiata a Comparator:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

Nota il metodo MyType. Lo useremo in seguito per estrarre l'attività originale da questo <=> che abbiamo creato.

Infine, modificiamo l'attività originale che stavamo tentando di eseguire:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

Puoi vedere che implementiamo <=> nell'attività per delegare l'attuale <=> per <=>.

E il gioco è fatto, priorità personalizzata per un Executor che utilizza le librerie Java! Ci vuole un po 'di flessione, ma è il più pulito che sono stato in grado di inventare. Spero che questo sia utile a qualcuno!

È possibile utilizzare queste classi di supporto:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

e

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

E questo metodo di supporto:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

E quindi usalo in questo modo:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Proverò a spiegare questo problema con un codice completamente funzionale. Ma prima di immergermi nel codice vorrei spiegare su PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue è un'implementazione di BlockingQueue. Accetta le attività insieme alla loro priorità e invia l'attività con la priorità più alta per l'esecuzione prima. Se due attività hanno la stessa priorità, è necessario fornire una logica personalizzata per decidere quale attività inizia per prima.

Ora entriamo subito nel codice.

Classe driver : questa classe crea un esecutore che accetta le attività e in seguito le invia per l'esecuzione. Qui creiamo due attività una con priorità BASSA e l'altra con priorità ALTA. Qui diciamo all'esecutore di eseguire un MAX di 1 thread e usare PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

Classe MyTask : MyTask implementa Runnable e accetta la priorità come argomento nel costruttore. Quando questa attività viene eseguita, stampa un messaggio e quindi mette in pausa il thread per 1 secondo.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

Classe MyFutureTask : poiché stiamo usando PriorityBlocingQueue per svolgere i nostri compiti, i nostri compiti devono essere racchiusi in FutureTask e la nostra implementazione di FutureTask deve implementare un'interfaccia comparabile. L'interfaccia comparabile confronta la priorità di 2 diverse attività e invia l'attività con la massima priorità per l'esecuzione.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Classe di priorità : classe di priorità autoesplicativa.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Ora quando eseguiamo questo esempio, otteniamo il seguente output

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Anche se prima abbiamo inviato la priorità BASSA, ma l'attività ALTA priorità in seguito, ma poiché stiamo usando un PriorityBlockingQueue, qualsiasi attività con una priorità più alta verrà eseguita per prima.

La mia soluzione conserva l'ordine di invio delle attività per le stesse priorità. È un miglioramento di questa risposta

Ordine di esecuzione dell'attività si basa su:

  1. Priorità
  2. Invia ordine (con la stessa priorità)

Classe tester:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Risultato:

  

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

     

La prima attività è A1 perché non c'erano priorità più alte nella coda quando è stata inserita. Le attività B hanno 1 priorità, quindi eseguite in precedenza, le attività A hanno 0 priorità, quindi eseguite in seguito, ma l'ordine di esecuzione segue l'ordine di presentazione: B1, B2, B3, ... A2, A3, A4 ...

La soluzione:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

Sarebbe possibile avere un ThreadPoolExecutor per ogni livello di priorità? A ThreadPoolExecutor può essere avviato con un ThreadFactory e potresti avere la tua implementazione di un ThreadFactory per impostare i diversi livelli di priorità.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top