كيف يمكنني تنفيذ تحديد أولويات المهام باستخدام ExecutorService في Java 5؟

StackOverflow https://stackoverflow.com/questions/807223

سؤال

أقوم بتنفيذ آلية تجميع الخيوط التي أرغب في تنفيذ المهام ذات الأولويات المختلفة.أرغب في الحصول على آلية جيدة يمكنني من خلالها إرسال مهمة ذات أولوية عالية إلى الخدمة وجدولتها قبل المهام الأخرى.إن أولوية المهمة هي خاصية جوهرية للمهمة نفسها (سواء كنت أعبر عن هذه المهمة على أنها Callable أو أ Runnable ليس مهما بالنسبة لي).

الآن، يبدو ظاهريًا أنه يمكنني استخدام ملف PriorityBlockingQueue كقائمة انتظار المهام في ملفي ThreadPoolExecutor, ، لكن قائمة الانتظار هذه تحتوي على Runnable الكائنات، والتي قد تكون أو لا تكون Runnable المهام التي قدمتها إليها.وعلاوة على ذلك، إذا كنت قد قدمت Callable المهام، ليس من الواضح كيف سيتم تعيين ذلك على الإطلاق.

هل هناك طريقة للقيام بذلك؟أفضل حقًا ألا أقوم بهذا الأمر، لأنني على الأرجح سأخطئ بهذه الطريقة.

(جانبا؛نعم، أنا على علم بإمكانية الحرمان من الوظائف ذات الأولوية المنخفضة في شيء مثل هذا.نقاط إضافية (؟!) للحلول التي تتمتع بضمان معقول للعدالة)

هل كانت مفيدة؟

المحلول

للوهلة الأولى قد يبدو لك أن تعرف واجهة لمهامكم التي تمتد Runnable أو Callable<T> وComparable. ثم لف ThreadPoolExecutor مع PriorityBlockingQueue كما قائمة الانتظار، واستعرض المهام التي تنفذ واجهة فقط.

وأخذ تعليقك بعين الاعتبار، فإنه يبدو وكأنه خيار واحد هو توسيع ThreadPoolExecutor، وتجاوز الأساليب submit(). الرجوع إلى AbstractExecutorService لنرى ما تبدو تلك الافتراضي مثل؛ كل ما يفعلونه هو التفاف Runnable أو Callable في FutureTask وexecute() ذلك. كنت على الارجح القيام بذلك عن طريق كتابة فئة مجمع التي تطبق ExecutorService والمندوبين إلى ThreadPoolExecutor الداخلية مجهول. التفاف عليها في شيء أن تكون له الأولوية، بحيث Comparator الخاص بك يمكن الحصول على ذلك.

نصائح أخرى

ولقد حل هذه المشكلة بطريقة معقولة، وأنا أصف ذلك أدناه للرجوع إليها في المستقبل لنفسي وغيرهم ممن يعمل في هذه المشكلة مع المكتبات المتزامنة جافا.

وباستخدام PriorityBlockingQueue كوسيلة لعقد على مهام التنفيذ لاحق هو في الواقع حركة في الاتجاه الصحيح. المشكلة هي أن PriorityBlockingQueue يجب أن يكون مثيل بشكل عام لاحتواء الحالات Runnable، وأنه من المستحيل أن يدعو compareTo (أو مماثلة) على واجهة Runnable.

وعلى حل المشكلة. عند إنشاء المنفذ، لا بد من إعطاء PriorityBlockingQueue. وينبغي أيضا إعطاء قائمة الانتظار والمقارنة مخصصة للقيام السليم في مكان الفرز:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

والآن، نظرة خاطفة على CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

كل شيء يبحث جميلة على التوالي إلى الأمام حتى هذه اللحظة. فإنه يحصل لزجة قليلا هنا. مشكلتنا التالية هي للتعامل مع إنشاء FutureTasks من المنفذ. في المنفذ، يجب علينا تجاوز newTaskFor وذلك:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

وأين c هي المهمة Callable أن نحاول تنفيذ. الآن، دعونا نلقي نظرة خاطفة على CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

لاحظ طريقة getTask. نحن استخدام ستعمل في وقت لاحق للاستيلاء على المهمة الأصلية للخروج من هذا CustomFutureTask التي قمنا بإنشائها.

وأخيرا، دعونا تعديل المهمة الأصلية التي كنا نحاول تنفيذ:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

ويمكنك أن ترى أن ننفذ Comparable في مهمة تخويل Comparator الفعلية لMyType.

وهناك لديك، وتحديد الأولويات مخصصة لالمنفذ باستخدام مكتبات جافا! الأمر يحتاج إلى بعض القليل من الانحناء، لكنه أنظف لقد كنت قادرة على أن تصل. آمل أن يكون هذا مفيدا لشخص ما!

يمكنك استخدام هذه الفئات المساعدة:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

و

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

و هذه الطريقة المساعدة:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

و ثم استخدمه مثل هذا:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

وسأحاول شرح هذه المشكلة مع رمز تعمل بكامل طاقتها. ولكن قبل الغوص في التعليمات البرمجية أود أن أشرح عن PriorityBlockingQueue

على PriorityBlockingQueue : لPriorityBlockingQueue هو تنفيذ BlockingQueue. أنها تقبل المهام جنبا إلى جنب مع أولوياتها ويقدم المهمة ذات الأولوية العليا لتنفيذ أولا. إذا كان لدى أي اثنين من المهام ذات الأولوية نفسها، ثم نحن بحاجة إلى توفير بعض المنطق مخصص لتقرر أي مهمة يذهب أولا.

والآن دعونا ندخل في الفور التعليمات البرمجية.

<القوي> الطبقة سائق : في هذه الفئة يخلق المنفذ الذي يقبل المهام وبعد رفعها لتنفيذها. نحن هنا إنشاء مهمتين واحدة مع أولوية منخفضة وأخرى مع أولوية عالية. هنا نقول للمنفذ لتشغيل MAX 1 المواضيع واستخدام PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

<قوية> الطبقة MyTask : لMyTask تنفذ Runnable وتقبل الأولوية كحجة في منشئ. عند تشغيل هذه المهمة، فإنه يطبع رسالة ثم يضع موضوع النوم لمدة 1 ثانية.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

<قوية> MyFutureTask الدرجة : لوبما أننا تستخدم PriorityBlocingQueue لعقد مهامنا، يجب أن تكون ملفوفة مهامنا داخل FutureTask وتنفيذنا للFutureTask يجب تنفيذ واجهة قابلة للمقارنة. واجهة المقارنة يقارن الأولوية من 2 مهام مختلفة وترفع المهمة ذات الأولوية العليا لتنفيذها.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

<قوية> الطبقة الأولوية : لالذاتي التفسيرية الطبقة الأولوية.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

والآن عندما نقوم بتشغيل هذا المثال، نحصل على الناتج التالي

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

وعلى الرغم من أننا تقديم أولوية منخفضة لأول مرة، ولكن مهمة ذات أولوية عالية في وقت لاحق، ولكن بما أننا تستخدم PriorityBlockingQueue، أي مهمة مع أولوية أعلى ستنفذ لأول مرة.

يحافظ الحل الخاص بي على ترتيب إرسال المهام لنفس الأولويات.إنه تحسين لهذا إجابة

أمر تنفيذ المهمة مبني على:

  1. أولوية
  2. إرسال الطلب (ضمن نفس الأولوية)

فئة المختبر:

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

نتيجة:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

المهمة الأولى هي A1 لأنه لم تكن هناك أولوية أعلى في قائمة الانتظار عند إدراجها.المهام B لها أولوية واحدة بحيث يتم تنفيذها مبكرًا، والمهام A لها أولوية 0 لذلك يتم تنفيذها لاحقًا، ولكن ترتيب التنفيذ يتبع ترتيب التقديم:ب1، ب2، ب3،...A2، A3، A4...

الحل:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

هل من الممكن أن يكون واحدا ThreadPoolExecutor للحصول على كل مستوى من الأولوية؟ A ThreadPoolExecutor يمكن instanciated مع وThreadFactory والتي يمكن أن يكون التنفيذ الخاصة بك من <لأ href = "https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadFactory.html" يختلط = "نوفولو noreferrer" > ThreadFactory لتعيين مستويات مختلفة الأولوية.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top