Frage

Ich bin die Umsetzung einen Thread-Pooling-Mechanismus, in dem Ich mag würde Aufgaben unterschiedlicher Prioritäten auszuführen. Ich möchte einen schönen Mechanismus haben, wobei ich eine Aufgabe hohe Priorität in den Dienst einreichen kann und habe es vor anderen Aufgaben geplant werden. Die Priorität der Aufgabe ist eine intrinsische Eigenschaft der Aufgabe selbst (ob ich diese Aufgabe als Callable auszudrücken oder ein Runnable ist mir nicht wichtig).

Nun, oberflächlich sieht es aus wie ich eine PriorityBlockingQueue als Task-Warteschlange in meinem ThreadPoolExecutor verwenden könnte, aber diese Warteschlange enthält Runnable Objekte, die nicht die Runnable Aufgaben sein kann oder ich es eingereicht haben. Außerdem, wenn ich Callable Aufgaben vorgelegt haben, ist es nicht klar ist, wie dies würde je Karte.

Gibt es eine Möglichkeit, dies zu tun? Ich würde wirklich lieber nicht meine eigene Rolle für diesen, da ich viel eher bin, um es falsch, dass die Art und Weise zu bekommen.

(Nebenbei,.? Ja, ich bin über die Möglichkeit des Hungers für niedrigere Priorität Arbeitsplätze in so etwas wie diese Extra-Punkte () für Lösungen, die eine angemessene Garantie für Fairness haben)

War es hilfreich?

Lösung

Auf den ersten Blick scheint es würden Sie eine Schnittstelle für Ihre Aufgaben definieren könnte, die Runnable oder Callable<T> und Comparable erstreckt. Dann wickeln Sie ein ThreadPoolExecutor mit einem PriorityBlockingQueue als die Warteschlange, und nur Aufgaben übernehmen, die Ihre Schnittstelle implementieren.

Unter Ihrem Kommentar zu berücksichtigen, es sieht aus wie eine Option ThreadPoolExecutor zu erweitern ist, und die submit() Methoden außer Kraft setzen. Siehe AbstractExecutorService zu sehen, was die Standardeinstellungen aussehen; alles, was sie tun, ist die Runnable oder Callable in einem FutureTask wickeln und execute() es. Ich würde wahrscheinlich das tun, indem eine Wrapper-Klasse zu schreiben, die ExecutorService und die Delegierten zu einem anonymen inneren ThreadPoolExecutor implementiert. Wickeln Sie sie in etwas, das Ihre Priorität hat, so dass Ihre Comparator es bekommen kann.

Andere Tipps

ich dieses Problem in einer angemessenen Art und Weise gelöst haben, und ich werde es unten an mich und alle anderen für die Zukunft beschreiben, die mit den Java Concurrent Bibliotheken in dieses Problem ausgeführt wird.

ein PriorityBlockingQueue als Mittel verwendet für auf Aufgaben für die spätere Ausführung hält, ist in der Tat eine Bewegung in der richtigen Richtung. Das Problem ist, dass der PriorityBlockingQueue muss allgemein instanziert wird Runnable Instanzen enthalten, und es ist unmöglich compareTo (oder ähnlich) auf einer Runnable Schnittstelle zu nennen.

Auf das Problem zu lösen. Wenn der Executor erstellen, muss es eine PriorityBlockingQueue gegeben werden. Die Warteschlange sollte ferner einen benutzerdefinierten Komparator gegeben wird anstelle Sortier richtigen zu tun:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

Nun, ein Blick auf CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

Alles sieht ziemlich geradlinig bis zu diesem Punkt. Es wird ein bisschen hier klebrig. Unser nächstes Problem ist mit der Schaffung von FutureTasks vom Executor zu beschäftigen. Im Executor, müssen wir als so außer Kraft setzen newTaskFor:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

Wo c ist die Callable Aufgabe, die wir ausführen versuchen. Nun lassen Sie uns einen Blick auf CustomFutureTask haben:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

die getTask Methode Hinweis. Wir werden den Einsatz, die später aus dieser CustomFutureTask die ursprüngliche Aufgabe zu ergreifen, die wir erstellt haben.

Und schließlich wollen sie die ursprüngliche Aufgabe ändern, die wir ausführen versuchen:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

Sie können sehen, dass wir Comparable in der Aufgabe implementieren, um die tatsächliche Comparator für MyType zu delegieren.

Und dort haben Sie es, individuelle Priorisierung für eine Executor der Java-Bibliotheken! Es dauert einige wenig biegen, aber es ist das sauberste, das ich in der Lage gewesen, mit zu kommen. Ich hoffe, dies ist hilfreich für jemanden!

Sie können diese Hilfsklassen verwenden:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

und

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

und diese Hilfsmethode:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

und dann verwenden Sie es wie folgt aus:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

Ich werde versuchen, dieses Problem mit einem voll funktionsfähigen Code zu erklären. Aber vor dem Tauchen in den Code würde Ich mag über PriorityBlockingQueue erklären

PriorityBlockingQueue : PriorityBlockingQueue ist eine Implementierung von Blocking. Er nimmt die Aufgaben zusammen mit ihrer Priorität und legt die Aufgabe mit der höchsten Priorität für die Ausführung zuerst. Wenn zwei Aufgaben gleiche Priorität haben, dann müssen wir einige benutzerdefinierte Logik schaffen, die Aufgabe zu entscheiden, geht zuerst.

Nun lets get in den Code sofort.

Treiberklasse : Diese Klasse erstellt einen Testamentsvollstrecker, die und sie Aufgaben übernimmt später einreicht zur Ausführung. Hier schaffen wir zwei Aufgaben, die man mit Priorität LOW und das andere mit hoher Priorität. Hier sagen wir den Testamentsvollstrecker einen MAX von 1 Themen und verwenden Sie die PriorityBlockingQueue auszuführen.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

MyTask Klasse : MyTask implementiert Runnable und nimmt Priorität als Argument im Konstruktor. Wenn diese Aufgabe ausgeführt wird, druckt er eine Nachricht und legt dann den Faden 1 Sekunde lang schlafen.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask Klasse : Da wir PriorityBlocingQueue nutzen unsere Aufgaben zu halten, unsere Aufgaben innerhalb FutureTask gewickelt werden müssen, und unsere Implementierung von FutureTask müssen vergleichbare Schnittstelle implementieren. Die vergleichbare Schnittstelle vergleicht die Priorität von 2 verschiedenen Aufgaben und legt die Aufgabe mit der höchsten Priorität für die Ausführung.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Prioritätsklasse : Selbsterklärend Prioritätsklasse.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Wenn wir nun dieses Beispiel auszuführen, erhalten wir die folgende Ausgabe

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

Auch wenn wir die niedrige Priorität eingereicht ersten, aber ein hohe Priorität Aufgabe später, aber da wir eine PriorityBlockingQueue verwenden, jede Aufgabe mit einer höheren Priorität wird ausgeführt, zuerst.

Meine Lösung bewahrt submition Reihenfolge der Aufgaben für gleiche Prioritäten. Es ist eine Verbesserung dieser beantworten

Task-Ausführungsreihenfolge basiert auf:

  1. Priorität
  2. Bestellung abschicken (innerhalb derselben Priorität)

Tester Klasse:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

Ergebnis:

  

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

     

Erste Aufgabe ist A1, weil es keine höhere Priorität in der Warteschlange war, als sie eingeführt wurde. B Aufgaben sind 1 Priorität so früher ausgeführt, A Aufgaben 0 Priorität so später ausgeführt wird, aber die Ausführungsreihenfolge ist folgendermaßen submition Reihenfolge: B1, B2, B3, ... A2, A3, A4 ...

Die Lösung:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

Wäre es möglich, einen zu haben, ThreadPoolExecutor für jede Ebene der Priorität? Ein ThreadPoolExecutor mit instanziiert werden ein Thread und Sie könnten Ihre eigene Implementierung von a Thread die verschiedenen Prioritätsstufen eingestellt werden.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top