Java 5でExecutorServiceを使用してタスクの優先順位付けを実装するにはどうすればよいですか?
-
03-07-2019 - |
質問
さまざまな優先度のタスクを実行するスレッドプーリングメカニズムを実装しています。優先度の高いタスクをサービスに送信し、他のタスクの前にスケジュールすることができる素晴らしいメカニズムが欲しいです。タスクの優先順位は、タスク自体の固有のプロパティです(そのタスクをCallable
とRunnable
のどちらで表現するかは重要ではありません)。
今、表面的にはPriorityBlockingQueue
をタスクキューとしてThreadPoolExecutor
を使用できるように見えますが、そのキューには<=>オブジェクトが含まれています。それに。さらに、<=>タスクを送信した場合、これがどのようにマップされるかは明確ではありません。
これを行う方法はありますか?私はそのように間違っている可能性がはるかに高いので、私はこれのために自分自身を転がしたくないのです。
(余談です。はい、このような低優先度の仕事の飢starの可能性を認識しています。公平性を合理的に保証するソリューションの追加ポイント(?!))
解決
最初は、Runnable
またはCallable<T>
とComparable
を拡張するタスクのインターフェイスを定義できるように見えます。次に、ThreadPoolExecutor
をキューとしてPriorityBlockingQueue
でラップし、インターフェースを実装するタスクのみを受け入れます。
コメントを考慮すると、1つのオプションがsubmit()
を拡張し、AbstractExecutorService
メソッドをオーバーライドすることのように見えます。 Callable
を参照して、デフォルトの外観を確認してください。 FutureTask
またはexecute()
をExecutorService
とComparator
でラップするだけです。 <=>を実装し、匿名の内部<=>に委任するラッパークラスを作成することで、おそらくこれを行います。優先度の高いものでそれらをラップし、<=>でそれを取得できるようにします。
他のヒント
私はこの問題を合理的な方法で解決しました。今後、私自身と、Java Concurrentライブラリでこの問題に遭遇した他の人に言及するために、以下で説明します。
PriorityBlockingQueue
を後で実行するためにタスクを保持する手段として使用することは、実際には正しい方向への動きです。問題は、Runnable
インスタンスを含めるためにcompareTo
を一般的にインスタンス化する必要があり、CustomTaskComparator
インターフェイスでnewTaskFor
(または類似)を呼び出すことができないことです。
問題の解決について。エグゼキューターを作成する場合、c
を指定する必要があります。キューには、適切なインプレースソートを行うためのカスタムコンパレータをさらに指定する必要があります。
new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());
今、Callable
の概要:
public class CustomTaskComparator implements Comparator<MyType> {
@Override
public int compare(MyType first, MyType second) {
return comparison;
}
}
この時点までは非常に単純なものでした。ここでは少しべたべたします。次の問題は、ExecutorからFutureTasksを作成することです。エグゼキューターでは、次のようにCustomFutureTask
をオーバーライドする必要があります:
@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
//Override the default FutureTask creation and retrofit it with
//a custom task. This is done so that prioritization can be accomplished.
return new CustomFutureTask(c);
}
getTask
は、実行しようとしているComparable
タスクです。では、Comparator
:
public class CustomFutureTask extends FutureTask {
private CustomTask task;
public CustomFutureTask(Callable callable) {
super(callable);
this.task = (CustomTask) callable;
}
public CustomTask getTask() {
return task;
}
}
MyType
メソッドに注意してください。後で使用して、作成したこの<=>から元のタスクを取得します。
そして最後に、実行しようとしていた元のタスクを変更しましょう:
public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {
private final MyType myType;
public CustomTask(MyType myType) {
this.myType = myType;
}
@Override
public MyType call() {
//Do some things, return something for FutureTask implementation of `call`.
return myType;
}
@Override
public int compareTo(MyType task2) {
return new CustomTaskComparator().compare(this.myType, task2.myType);
}
}
<=>の実際の<=>に委任するタスクに<=>を実装していることがわかります。
これで、Javaライブラリを使用したエグゼキューターのカスタマイズされた優先順位付けができました!少し曲げる必要がありますが、私が考え出した中で最もきれいです。これが誰かに役立つことを願っています!
これらのヘルパークラスを使用できます:
public class PriorityFuture<T> implements RunnableFuture<T> {
private RunnableFuture<T> src;
private int priority;
public PriorityFuture(RunnableFuture<T> other, int priority) {
this.src = other;
this.priority = priority;
}
public int getPriority() {
return priority;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return src.cancel(mayInterruptIfRunning);
}
public boolean isCancelled() {
return src.isCancelled();
}
public boolean isDone() {
return src.isDone();
}
public T get() throws InterruptedException, ExecutionException {
return src.get();
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return src.get(timeout, unit);
}
public void run() {
src.run();
}
public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
public int compare(Runnable o1, Runnable o2) {
if (o1 == null && o2 == null)
return 0;
else if (o1 == null)
return -1;
else if (o2 == null)
return 1;
else {
int p1 = ((PriorityFuture<?>) o1).getPriority();
int p2 = ((PriorityFuture<?>) o2).getPriority();
return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
}
}
};
}
AND
public interface PriorityCallable<T> extends Callable<T> {
int getPriority();
}
AND このヘルパーメソッド:
public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
}
};
}
AND その後、次のように使用します:
class LenthyJob implements PriorityCallable<Long> {
private int priority;
public LenthyJob(int priority) {
this.priority = priority;
}
public Long call() throws Exception {
System.out.println("Executing: " + priority);
long num = 1000000;
for (int i = 0; i < 1000000; i++) {
num *= Math.random() * 1000;
num /= Math.random() * 1000;
if (num == 0)
num = 1000000;
}
return num;
}
public int getPriority() {
return priority;
}
}
public class TestPQ {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor exec = getPriorityExecutor(2);
for (int i = 0; i < 20; i++) {
int priority = (int) (Math.random() * 100);
System.out.println("Scheduling: " + priority);
LenthyJob job = new LenthyJob(priority);
exec.submit(job);
}
}
}
完全に機能するコードでこの問題を説明しようと思います。しかし、コードに入る前に、PriorityBlockingQueueについて説明したいと思います
PriorityBlockingQueue :PriorityBlockingQueueはBlockingQueueの実装です。タスクとその優先度を受け入れ、最初に実行のために最高の優先度を持つタスクを送信します。 2つのタスクの優先度が同じ場合、カスタムロジックを提供して、どのタスクを最初に実行するかを決定する必要があります。
これですぐにコードにアクセスできます。
Driverクラス:このクラスは、タスクを受け入れて後で実行するために送信するエグゼキューターを作成します。ここでは、優先度が低いタスクと優先度が高いタスクの2つを作成します。ここでは、最大1スレッドを実行し、PriorityBlockingQueueを使用するようエグゼキューターに指示します。
public static void main(String[] args) {
/*
Minimum number of threads that must be running : 0
Maximium number of threads that can be created : 1
If a thread is idle, then the minimum time to keep it alive : 1000
Which queue to use : PriorityBlockingQueue
*/
PriorityBlockingQueue queue = new PriorityBlockingQueue();
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
1000, TimeUnit.MILLISECONDS,queue);
MyTask task = new MyTask(Priority.LOW,"Low");
executor.execute(new MyFutureTask(task));
task = new MyTask(Priority.HIGH,"High");
executor.execute(new MyFutureTask(task));
task = new MyTask(Priority.MEDIUM,"Medium");
executor.execute(new MyFutureTask(task));
}
MyTaskクラス:MyTaskはRunnableを実装し、コンストラクターの引数として優先度を受け入れます。このタスクを実行すると、メッセージが出力され、スレッドが1秒間スリープ状態になります。
public class MyTask implements Runnable {
public int getPriority() {
return priority.getValue();
}
private Priority priority;
public String getName() {
return name;
}
private String name;
public MyTask(Priority priority,String name){
this.priority = priority;
this.name = name;
}
@Override
public void run() {
System.out.println("The following Runnable is getting executed "+getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
MyFutureTaskクラス:タスクを保持するためにPriorityBlocingQueueを使用しているため、タスクをFutureTask内にラップし、FutureTaskの実装でComparableインターフェイスを実装する必要があります。 Comparableインターフェースは、2つの異なるタスクの優先度を比較し、実行の優先度が最も高いタスクを送信します。
public class MyFutureTask extends FutureTask<MyFutureTask>
implements Comparable<MyFutureTask> {
private MyTask task = null;
public MyFutureTask(MyTask task){
super(task,null);
this.task = task;
}
@Override
public int compareTo(MyFutureTask another) {
return task.getPriority() - another.task.getPriority();
}
}
優先度クラス:自明の優先度クラス。
public enum Priority {
HIGHEST(0),
HIGH(1),
MEDIUM(2),
LOW(3),
LOWEST(4);
int value;
Priority(int val) {
this.value = val;
}
public int getValue(){
return value;
}
}
この例を実行すると、次の出力が得られます
The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low
優先度が低いタスクを最初に送信し、優先度が高いタスクを後で送信したにもかかわらず、PriorityBlockingQueueを使用しているため、優先度の高いタスクが最初に実行されます。
私のソリューションは、同じ優先度のタスクの送信順序を保持します。これはこの回答
の改善ですタスクの実行順序は以下に基づいています:
- 優先度
- 注文の送信(同じ優先度内)
テスタークラス:
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);
//Priority=0
executorService.submit(newCallable("A1", 200)); //Defaults to priority=0
executorService.execute(newRunnable("A2", 200)); //Defaults to priority=0
executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));
//Priority=1
executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));
executorService.shutdown();
}
private static Runnable newRunnable(String name, int delay) {
return new Runnable() {
@Override
public void run() {
System.out.println(name);
sleep(delay);
}
};
}
private static Callable<Integer> newCallable(String name, int delay) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(name);
sleep(delay);
return 10;
}
};
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
結果:
A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8
最初のタスクはA1です。これは、キューが挿入されたときにそれより高い優先度がなかったためです。 Bタスクは優先度が1なので早めに実行され、Aタスクは優先度が0なので後で実行されますが、実行順序は送信順序に従います:B1、B2、B3、... A2、A3、A4 ...
解決策:
public class PriorityExecutors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
}
private static class PriorityExecutor extends ThreadPoolExecutor {
private static final int DEFAULT_PRIORITY = 0;
private static AtomicLong instanceCounter = new AtomicLong();
@SuppressWarnings({"unchecked"})
public PriorityExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
ComparableTask.comparatorByPriorityAndSequentialOrder()));
}
@Override
public void execute(Runnable command) {
// If this is ugly then delegator pattern needed
if (command instanceof ComparableTask) //Already wrapped
super.execute(command);
else {
super.execute(newComparableRunnableFor(command));
}
}
private Runnable newComparableRunnableFor(Runnable runnable) {
return new ComparableRunnable(ensurePriorityRunnable(runnable));
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new ComparableFutureTask<>(ensurePriorityCallable(callable));
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
}
private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
: PriorityCallable.of(callable, DEFAULT_PRIORITY);
}
private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
: PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
}
private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
private Long sequentialOrder = instanceCounter.getAndIncrement();
private HasPriority hasPriority;
public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
super(priorityCallable);
this.hasPriority = priorityCallable;
}
public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
super(priorityRunnable, result);
this.hasPriority = priorityRunnable;
}
@Override
public long getInstanceCount() {
return sequentialOrder;
}
@Override
public int getPriority() {
return hasPriority.getPriority();
}
}
private static class ComparableRunnable implements Runnable, ComparableTask {
private Long instanceCount = instanceCounter.getAndIncrement();
private HasPriority hasPriority;
private Runnable runnable;
public ComparableRunnable(PriorityRunnable priorityRunnable) {
this.runnable = priorityRunnable;
this.hasPriority = priorityRunnable;
}
@Override
public void run() {
runnable.run();
}
@Override
public int getPriority() {
return hasPriority.getPriority();
}
@Override
public long getInstanceCount() {
return instanceCount;
}
}
private interface ComparableTask extends Runnable {
int getPriority();
long getInstanceCount();
public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
return (o1, o2) -> {
int priorityResult = o2.getPriority() - o1.getPriority();
return priorityResult != 0 ? priorityResult
: (int) (o1.getInstanceCount() - o2.getInstanceCount());
};
}
}
}
private static interface HasPriority {
int getPriority();
}
public interface PriorityCallable<V> extends Callable<V>, HasPriority {
public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
return new PriorityCallable<V>() {
@Override
public V call() throws Exception {
return callable.call();
}
@Override
public int getPriority() {
return priority;
}
};
}
}
public interface PriorityRunnable extends Runnable, HasPriority {
public static PriorityRunnable of(Runnable runnable, int priority) {
return new PriorityRunnable() {
@Override
public void run() {
runnable.run();
}
@Override
public int getPriority() {
return priority;
}
};
}
}
}
1つのを持つことは可能でしょうか優先度の各レベルのThreadPoolExecutor ThreadPoolExecutor をインスタンス化できますThreadFactoryおよび ThreadFactory を使用して、さまざまな優先度レベルを設定します。
class MaxPriorityThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setPriority(Thread.MAX_PRIORITY);
}
}