Java 5의 ExecutorService를 사용하여 작업 우선 순위를 구현하려면 어떻게해야합니까?
-
03-07-2019 - |
문제
다양한 우선 순위의 작업을 실행하고 싶은 스레드 풀링 메커니즘을 구현하고 있습니다. 서비스에 우선 순위가 높은 작업을 제출할 수 있고 다른 작업보다 먼저 일정을 잡을 수있는 좋은 메커니즘을 원합니다. 작업의 우선 순위는 작업 자체의 본질적인 속성입니다 (내가 해당 작업을 Callable
또는 a Runnable
나에게는 중요하지 않습니다).
이제 피상적으로 내가 사용할 수있는 것처럼 보입니다. PriorityBlockingQueue
내 작업 대기열로 ThreadPoolExecutor
, 그러나 그 대기열에는 포함되어 있습니다 Runnable
물체, 그렇지 않을 수도 있습니다 Runnable
내가 제출 한 작업. 또한 제출 한 경우 Callable
과제, 이것이 어떻게 매핑 될지 명확하지 않습니다.
이것을 할 방법이 있습니까? 나는 그렇게 잘못 될 가능성이 훨씬 높기 때문에 오히려 내 자신을 굴러 가지 않을 것입니다.
(제쳐두고; 예, 나는 이와 같은 것들에서 우선 순위가 낮은 일자리에 대한 기아의 가능성을 알고 있습니다. 공정성을 합리적으로 보장하는 솔루션을위한 추가 포인트 (?!))
해결책
처음에는 홍당무가 연장되는 작업의 인터페이스를 정의 할 수있는 것 같습니다. Runnable
또는 Callable<T>
그리고 Comparable
. 그런 다음 a ThreadPoolExecutor
a PriorityBlockingQueue
대기열로서 인터페이스를 구현하는 작업 만 수락하십시오.
의견을 고려하면 하나의 옵션이 확장하는 것 같습니다. ThreadPoolExecutor
, 그리고 그것을 무시합니다 submit()
행동 양식. 인용하다 AbstractExecutorService
기본 기본의 모습을보기 위해; 그들이하는 일은 Runnable
또는 Callable
안에 FutureTask
그리고 execute()
그것. 나는 아마도 그것을 구현하는 래퍼 클래스를 작성하여 이것을 할 것입니다. ExecutorService
익명의 내면의 대표단 ThreadPoolExecutor
. 당신의 우선 순위가있는 무언가로 마무리하면 Comparator
그것을 얻을 수 있습니다.
다른 팁
나는이 문제를 합리적인 방식으로 해결했으며, 나 자신과 Java 동시 도서관과 함께이 문제를 해결하는 다른 사람에 대한 향후 언급을 위해 아래에 설명 할 것입니다.
사용 a PriorityBlockingQueue
나중에 실행을위한 작업을 유지하기위한 수단은 실제로 올바른 방향으로의 움직임입니다. 문제는 PriorityBlockingQueue
일반적으로 포함하도록 인스턴스화해야합니다 Runnable
인스턴스, 전화가 불가능합니다 compareTo
(또는 비슷한) a Runnable
상호 작용.
문제를 해결하기 위해. 집행자를 만들 때 PriorityBlockingQueue
. 대기열에는 제자리에 올바르게 정렬 할 수 있도록 사용자 정의 비교기가 추가로 제공되어야합니다.
new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());
이제 엿보기 CustomTaskComparator
:
public class CustomTaskComparator implements Comparator<MyType> {
@Override
public int compare(MyType first, MyType second) {
return comparison;
}
}
이 시점까지 모든 것이 꽤 똑바로 보입니다. 여기에는 약간 끈적 끈적합니다. 우리의 다음 문제는 집행자로부터 FutureTasks의 생성을 다루는 것입니다. 집행자에서는 우리는 무시해야합니다 newTaskFor
그렇게 :
@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
//Override the default FutureTask creation and retrofit it with
//a custom task. This is done so that prioritization can be accomplished.
return new CustomFutureTask(c);
}
어디에 c
입니다 Callable
우리가 실행하려는 작업. 이제 엿보기합시다 CustomFutureTask
:
public class CustomFutureTask extends FutureTask {
private CustomTask task;
public CustomFutureTask(Callable callable) {
super(callable);
this.task = (CustomTask) callable;
}
public CustomTask getTask() {
return task;
}
}
주목하십시오 getTask
방법. 우리는 나중에 이것을 사용하여 원래의 작업을 가져갈 것입니다. CustomFutureTask
우리가 만든 것.
마지막으로, 우리가 실행하려는 원래의 작업을 수정합시다.
public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {
private final MyType myType;
public CustomTask(MyType myType) {
this.myType = myType;
}
@Override
public MyType call() {
//Do some things, return something for FutureTask implementation of `call`.
return myType;
}
@Override
public int compareTo(MyType task2) {
return new CustomTaskComparator().compare(this.myType, task2.myType);
}
}
우리가 구현한다는 것을 알 수 있습니다 Comparable
실제에 위임하는 과제 Comparator
~을 위한 MyType
.
그리고 Java 라이브러리를 사용하는 집행자에 대한 맞춤형 우선 순위가 있습니다! 약간의 굽힘이 필요하지만, 내가 생각해 낼 수 있었던 것은 가장 깨끗합니다. 나는 이것이 누군가에게 도움이되기를 바랍니다!
이 헬퍼 클래스를 사용할 수 있습니다.
public class PriorityFuture<T> implements RunnableFuture<T> {
private RunnableFuture<T> src;
private int priority;
public PriorityFuture(RunnableFuture<T> other, int priority) {
this.src = other;
this.priority = priority;
}
public int getPriority() {
return priority;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return src.cancel(mayInterruptIfRunning);
}
public boolean isCancelled() {
return src.isCancelled();
}
public boolean isDone() {
return src.isDone();
}
public T get() throws InterruptedException, ExecutionException {
return src.get();
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return src.get(timeout, unit);
}
public void run() {
src.run();
}
public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
public int compare(Runnable o1, Runnable o2) {
if (o1 == null && o2 == null)
return 0;
else if (o1 == null)
return -1;
else if (o2 == null)
return 1;
else {
int p1 = ((PriorityFuture<?>) o1).getPriority();
int p2 = ((PriorityFuture<?>) o2).getPriority();
return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
}
}
};
}
그리고
public interface PriorityCallable<T> extends Callable<T> {
int getPriority();
}
그리고 이 도우미 방법 :
public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
}
};
}
그리고 그런 다음 다음과 같이 사용하십시오.
class LenthyJob implements PriorityCallable<Long> {
private int priority;
public LenthyJob(int priority) {
this.priority = priority;
}
public Long call() throws Exception {
System.out.println("Executing: " + priority);
long num = 1000000;
for (int i = 0; i < 1000000; i++) {
num *= Math.random() * 1000;
num /= Math.random() * 1000;
if (num == 0)
num = 1000000;
}
return num;
}
public int getPriority() {
return priority;
}
}
public class TestPQ {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor exec = getPriorityExecutor(2);
for (int i = 0; i < 20; i++) {
int priority = (int) (Math.random() * 100);
System.out.println("Scheduling: " + priority);
LenthyJob job = new LenthyJob(priority);
exec.submit(job);
}
}
}
이 문제를 완전히 기능적인 코드로 설명하려고 노력할 것입니다. 그러나 코드로 뛰어 들기 전에 PriorityBlockingQueue에 대해 설명하고 싶습니다.
PriorityBlockingqueue : PriorityBlockingqueue는 Blockingqueue의 구현입니다. 작업을 우선 순위와 함께 수락하고 먼저 실행에 대한 우선 순위가 가장 높은 작업을 제출합니다. 두 작업이 동일한 우선 순위 인 경우, 먼저 어떤 작업이 먼저 진행되는지 결정하려면 사용자 정의 논리를 제공해야합니다.
이제 코드를 곧바로 들어갑니다.
드라이버 클래스 :이 클래스는 작업을 받아들이고 나중에 실행을 위해 제출하는 집행자를 만듭니다. 여기서 우리는 우선 순위가 낮고 다른 하나는 우선 순위가 높은 두 가지 작업을 만듭니다. 여기서 우리는 집행자에게 최대 1 개의 스레드를 실행하고 우선 순위 블로킹 Queue를 사용하도록 지시합니다.
public static void main(String[] args) {
/*
Minimum number of threads that must be running : 0
Maximium number of threads that can be created : 1
If a thread is idle, then the minimum time to keep it alive : 1000
Which queue to use : PriorityBlockingQueue
*/
PriorityBlockingQueue queue = new PriorityBlockingQueue();
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
1000, TimeUnit.MILLISECONDS,queue);
MyTask task = new MyTask(Priority.LOW,"Low");
executor.execute(new MyFutureTask(task));
task = new MyTask(Priority.HIGH,"High");
executor.execute(new MyFutureTask(task));
task = new MyTask(Priority.MEDIUM,"Medium");
executor.execute(new MyFutureTask(task));
}
Mytask 클래스 : MyTask는 실행 가능하고 생성자의 인수로 우선 순위를 수용합니다. 이 작업이 실행되면 메시지를 인쇄 한 다음 스레드를 1 초 동안 잠들게합니다.
public class MyTask implements Runnable {
public int getPriority() {
return priority.getValue();
}
private Priority priority;
public String getName() {
return name;
}
private String name;
public MyTask(Priority priority,String name){
this.priority = priority;
this.name = name;
}
@Override
public void run() {
System.out.println("The following Runnable is getting executed "+getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
myfuturetask 클래스 : 작업을 유지하기 위해 PriorityBlocingQueue를 사용하고 있으므로 FutureTask 내부에 작업을 마무리해야하며 FutureTask의 구현은 비슷한 인터페이스를 구현해야합니다. 비슷한 인터페이스는 2 개의 서로 다른 작업의 우선 순위를 비교하고 작업을 실행에 가장 높은 우선 순위로 제출합니다.
public class MyFutureTask extends FutureTask<MyFutureTask>
implements Comparable<MyFutureTask> {
private MyTask task = null;
public MyFutureTask(MyTask task){
super(task,null);
this.task = task;
}
@Override
public int compareTo(MyFutureTask another) {
return task.getPriority() - another.task.getPriority();
}
}
우선 순위 클래스 : 자기 설명 우선 순위 클래스.
public enum Priority {
HIGHEST(0),
HIGH(1),
MEDIUM(2),
LOW(3),
LOWEST(4);
int value;
Priority(int val) {
this.value = val;
}
public int getValue(){
return value;
}
}
이제이 예제를 실행하면 다음 출력을 얻습니다.
The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low
우선 순위가 낮지 만 우선 순위가 높은 작업을 후에 제출했지만 우선 순위가 높은 작업을 사용하고 있기 때문에 우선 순위가 높은 작업이 먼저 실행됩니다.
내 솔루션은 동일한 우선 순위에 대한 작업 순서를 보존합니다. 이것의 개선입니다 대답
작업 실행 순서 다음에 기반합니다 :
- 우선 사항
- 주문 제출 (동일한 우선 순위 내)
테스터 클래스 :
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);
//Priority=0
executorService.submit(newCallable("A1", 200)); //Defaults to priority=0
executorService.execute(newRunnable("A2", 200)); //Defaults to priority=0
executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));
//Priority=1
executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));
executorService.shutdown();
}
private static Runnable newRunnable(String name, int delay) {
return new Runnable() {
@Override
public void run() {
System.out.println(name);
sleep(delay);
}
};
}
private static Callable<Integer> newCallable(String name, int delay) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(name);
sleep(delay);
return 10;
}
};
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
결과:
A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8
큐가 삽입되었을 때 대기열에 우선 순위가 높지 않기 때문에 첫 번째 작업은 A1입니다. B 작업은 1 우선 순위이므로 이전에 실행되므로 작업은 0 우선 순위이므로 나중에 실행되지만 실행 순서는 Submition 순서를 따릅니다 : B1, B2, B3, ... A2, A3, A4 ...
해결책:
public class PriorityExecutors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
}
private static class PriorityExecutor extends ThreadPoolExecutor {
private static final int DEFAULT_PRIORITY = 0;
private static AtomicLong instanceCounter = new AtomicLong();
@SuppressWarnings({"unchecked"})
public PriorityExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
ComparableTask.comparatorByPriorityAndSequentialOrder()));
}
@Override
public void execute(Runnable command) {
// If this is ugly then delegator pattern needed
if (command instanceof ComparableTask) //Already wrapped
super.execute(command);
else {
super.execute(newComparableRunnableFor(command));
}
}
private Runnable newComparableRunnableFor(Runnable runnable) {
return new ComparableRunnable(ensurePriorityRunnable(runnable));
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new ComparableFutureTask<>(ensurePriorityCallable(callable));
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
}
private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
: PriorityCallable.of(callable, DEFAULT_PRIORITY);
}
private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
: PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
}
private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
private Long sequentialOrder = instanceCounter.getAndIncrement();
private HasPriority hasPriority;
public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
super(priorityCallable);
this.hasPriority = priorityCallable;
}
public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
super(priorityRunnable, result);
this.hasPriority = priorityRunnable;
}
@Override
public long getInstanceCount() {
return sequentialOrder;
}
@Override
public int getPriority() {
return hasPriority.getPriority();
}
}
private static class ComparableRunnable implements Runnable, ComparableTask {
private Long instanceCount = instanceCounter.getAndIncrement();
private HasPriority hasPriority;
private Runnable runnable;
public ComparableRunnable(PriorityRunnable priorityRunnable) {
this.runnable = priorityRunnable;
this.hasPriority = priorityRunnable;
}
@Override
public void run() {
runnable.run();
}
@Override
public int getPriority() {
return hasPriority.getPriority();
}
@Override
public long getInstanceCount() {
return instanceCount;
}
}
private interface ComparableTask extends Runnable {
int getPriority();
long getInstanceCount();
public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
return (o1, o2) -> {
int priorityResult = o2.getPriority() - o1.getPriority();
return priorityResult != 0 ? priorityResult
: (int) (o1.getInstanceCount() - o2.getInstanceCount());
};
}
}
}
private static interface HasPriority {
int getPriority();
}
public interface PriorityCallable<V> extends Callable<V>, HasPriority {
public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
return new PriorityCallable<V>() {
@Override
public V call() throws Exception {
return callable.call();
}
@Override
public int getPriority() {
return priority;
}
};
}
}
public interface PriorityRunnable extends Runnable, HasPriority {
public static PriorityRunnable of(Runnable runnable, int priority) {
return new PriorityRunnable() {
@Override
public void run() {
runnable.run();
}
@Override
public int getPriority() {
return priority;
}
};
}
}
}
하나를 가질 수 있습니까? ThreadPooleExecutor 각 우선 순위에 대해? ㅏ ThreadPooleExecutor Threadfactory와 함께 시동 할 수 있으며 귀하는 자신의 구현을 가질 수 있습니다. Threadfactory 다른 우선 순위 수준을 설정합니다.
class MaxPriorityThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setPriority(Thread.MAX_PRIORITY);
}
}