Java 5의 ExecutorService를 사용하여 작업 우선 순위를 구현하려면 어떻게해야합니까?

StackOverflow https://stackoverflow.com/questions/807223

문제

다양한 우선 순위의 작업을 실행하고 싶은 스레드 풀링 메커니즘을 구현하고 있습니다. 서비스에 우선 순위가 높은 작업을 제출할 수 있고 다른 작업보다 먼저 일정을 잡을 수있는 좋은 메커니즘을 원합니다. 작업의 우선 순위는 작업 자체의 본질적인 속성입니다 (내가 해당 작업을 Callable 또는 a Runnable 나에게는 중요하지 않습니다).

이제 피상적으로 내가 사용할 수있는 것처럼 보입니다. PriorityBlockingQueue 내 작업 대기열로 ThreadPoolExecutor, 그러나 그 대기열에는 포함되어 있습니다 Runnable 물체, 그렇지 않을 수도 있습니다 Runnable 내가 제출 한 작업. 또한 제출 한 경우 Callable 과제, 이것이 어떻게 매핑 될지 명확하지 않습니다.

이것을 할 방법이 있습니까? 나는 그렇게 잘못 될 가능성이 훨씬 높기 때문에 오히려 내 자신을 굴러 가지 않을 것입니다.

(제쳐두고; 예, 나는 이와 같은 것들에서 우선 순위가 낮은 일자리에 대한 기아의 가능성을 알고 있습니다. 공정성을 합리적으로 보장하는 솔루션을위한 추가 포인트 (?!))

도움이 되었습니까?

해결책

처음에는 홍당무가 연장되는 작업의 인터페이스를 정의 할 수있는 것 같습니다. Runnable 또는 Callable<T> 그리고 Comparable. 그런 다음 a ThreadPoolExecutor a PriorityBlockingQueue 대기열로서 인터페이스를 구현하는 작업 만 수락하십시오.

의견을 고려하면 하나의 옵션이 확장하는 것 같습니다. ThreadPoolExecutor, 그리고 그것을 무시합니다 submit() 행동 양식. 인용하다 AbstractExecutorService 기본 기본의 모습을보기 위해; 그들이하는 일은 Runnable 또는 Callable 안에 FutureTask 그리고 execute() 그것. 나는 아마도 그것을 구현하는 래퍼 클래스를 작성하여 이것을 할 것입니다. ExecutorService 익명의 내면의 대표단 ThreadPoolExecutor. 당신의 우선 순위가있는 무언가로 마무리하면 Comparator 그것을 얻을 수 있습니다.

다른 팁

나는이 문제를 합리적인 방식으로 해결했으며, 나 자신과 Java 동시 도서관과 함께이 문제를 해결하는 다른 사람에 대한 향후 언급을 위해 아래에 설명 할 것입니다.

사용 a PriorityBlockingQueue 나중에 실행을위한 작업을 유지하기위한 수단은 실제로 올바른 방향으로의 움직임입니다. 문제는 PriorityBlockingQueue 일반적으로 포함하도록 인스턴스화해야합니다 Runnable 인스턴스, 전화가 불가능합니다 compareTo (또는 비슷한) a Runnable 상호 작용.

문제를 해결하기 위해. 집행자를 만들 때 PriorityBlockingQueue. 대기열에는 제자리에 올바르게 정렬 할 수 있도록 사용자 정의 비교기가 추가로 제공되어야합니다.

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());

이제 엿보기 CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}

이 시점까지 모든 것이 꽤 똑바로 보입니다. 여기에는 약간 끈적 끈적합니다. 우리의 다음 문제는 집행자로부터 FutureTasks의 생성을 다루는 것입니다. 집행자에서는 우리는 무시해야합니다 newTaskFor 그렇게 :

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}

어디에 c 입니다 Callable 우리가 실행하려는 작업. 이제 엿보기합시다 CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}

주목하십시오 getTask 방법. 우리는 나중에 이것을 사용하여 원래의 작업을 가져갈 것입니다. CustomFutureTask 우리가 만든 것.

마지막으로, 우리가 실행하려는 원래의 작업을 수정합시다.

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}

우리가 구현한다는 것을 알 수 있습니다 Comparable 실제에 위임하는 과제 Comparator ~을 위한 MyType.

그리고 Java 라이브러리를 사용하는 집행자에 대한 맞춤형 우선 순위가 있습니다! 약간의 굽힘이 필요하지만, 내가 생각해 낼 수 있었던 것은 가장 깨끗합니다. 나는 이것이 누군가에게 도움이되기를 바랍니다!

이 헬퍼 클래스를 사용할 수 있습니다.

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get(timeout, unit);
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

그리고

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

그리고 이 도우미 방법 :

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

그리고 그런 다음 다음과 같이 사용하십시오.

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

이 문제를 완전히 기능적인 코드로 설명하려고 노력할 것입니다. 그러나 코드로 뛰어 들기 전에 PriorityBlockingQueue에 대해 설명하고 싶습니다.

PriorityBlockingqueue : PriorityBlockingqueue는 Blockingqueue의 구현입니다. 작업을 우선 순위와 함께 수락하고 먼저 실행에 대한 우선 순위가 가장 높은 작업을 제출합니다. 두 작업이 동일한 우선 순위 인 경우, 먼저 어떤 작업이 먼저 진행되는지 결정하려면 사용자 정의 논리를 제공해야합니다.

이제 코드를 곧바로 들어갑니다.

드라이버 클래스 :이 클래스는 작업을 받아들이고 나중에 실행을 위해 제출하는 집행자를 만듭니다. 여기서 우리는 우선 순위가 낮고 다른 하나는 우선 순위가 높은 두 가지 작업을 만듭니다. 여기서 우리는 집행자에게 최대 1 개의 스레드를 실행하고 우선 순위 블로킹 Queue를 사용하도록 지시합니다.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);


    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.MEDIUM,"Medium");
    executor.execute(new MyFutureTask(task));

}

Mytask 클래스 : MyTask는 실행 가능하고 생성자의 인수로 우선 순위를 수용합니다. 이 작업이 실행되면 메시지를 인쇄 한 다음 스레드를 1 초 동안 잠들게합니다.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

myfuturetask 클래스 : 작업을 유지하기 위해 PriorityBlocingQueue를 사용하고 있으므로 FutureTask 내부에 작업을 마무리해야하며 FutureTask의 구현은 비슷한 인터페이스를 구현해야합니다. 비슷한 인터페이스는 2 개의 서로 다른 작업의 우선 순위를 비교하고 작업을 실행에 가장 높은 우선 순위로 제출합니다.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

우선 순위 클래스 : 자기 설명 우선 순위 클래스.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

이제이 예제를 실행하면 다음 출력을 얻습니다.

The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low

우선 순위가 낮지 만 우선 순위가 높은 작업을 후에 제출했지만 우선 순위가 높은 작업을 사용하고 있기 때문에 우선 순위가 높은 작업이 먼저 실행됩니다.

내 솔루션은 동일한 우선 순위에 대한 작업 순서를 보존합니다. 이것의 개선입니다 대답

작업 실행 순서 다음에 기반합니다 :

  1. 우선 사항
  2. 주문 제출 (동일한 우선 순위 내)

테스터 클래스 :

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);

        //Priority=0
        executorService.submit(newCallable("A1", 200));     //Defaults to priority=0 
        executorService.execute(newRunnable("A2", 200));    //Defaults to priority=0
        executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
        executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
        executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));

        //Priority=1
        executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
        executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
        executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
        executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));

        executorService.shutdown();

    }

    private static Runnable newRunnable(String name, int delay) {
        return new Runnable() {
            @Override
            public void run() {
                System.out.println(name);
                sleep(delay);
            }
        };
    }

    private static Callable<Integer> newCallable(String name, int delay) {
        return new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println(name);
                sleep(delay);
                return 10;
            }
        };
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

}

결과:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

큐가 삽입되었을 때 대기열에 우선 순위가 높지 않기 때문에 첫 번째 작업은 A1입니다. B 작업은 1 우선 순위이므로 이전에 실행되므로 작업은 0 우선 순위이므로 나중에 실행되지만 실행 순서는 Submition 순서를 따릅니다 : B1, B2, B3, ... A2, A3, A4 ...

해결책:

public class PriorityExecutors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
    }

    private static class PriorityExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        @SuppressWarnings({"unchecked"})
        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                long keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable(ensurePriorityRunnable(runnable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new ComparableFutureTask<>(ensurePriorityCallable(callable));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
        }

        private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
            return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
                    : PriorityCallable.of(callable, DEFAULT_PRIORITY);
        }

        private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
            return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
                    : PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;

            public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
                super(priorityCallable);
                this.hasPriority = priorityCallable;
            }

            public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private HasPriority hasPriority;
            private Runnable runnable;

            public ComparableRunnable(PriorityRunnable priorityRunnable) {
                this.runnable = priorityRunnable;
                this.hasPriority = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

    private static interface HasPriority {
        int getPriority();
    }

    public interface PriorityCallable<V> extends Callable<V>, HasPriority {

        public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
            return new PriorityCallable<V>() {
                @Override
                public V call() throws Exception {
                    return callable.call();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

    public interface PriorityRunnable extends Runnable, HasPriority {

        public static PriorityRunnable of(Runnable runnable, int priority) {
            return new PriorityRunnable() {
                @Override
                public void run() {
                    runnable.run();
                }

                @Override
                public int getPriority() {
                    return priority;
                }
            };
        }
    }

}

하나를 가질 수 있습니까? ThreadPooleExecutor 각 우선 순위에 대해? ㅏ ThreadPooleExecutor Threadfactory와 함께 시동 할 수 있으며 귀하는 자신의 구현을 가질 수 있습니다. Threadfactory 다른 우선 순위 수준을 설정합니다.

 class MaxPriorityThreadFactory implements ThreadFactory {
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setPriority(Thread.MAX_PRIORITY);
     }
 }
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top