문제

실행 중인 비동기 작업이 거의 없으며 그 중 적어도 하나가 완료될 때까지 기다려야 합니다. 앞으로는 N 작업 중 util M이 완료될 때까지 기다려야 할 것입니다.현재는 Future로 표시되므로 다음과 같은 것이 필요합니다.

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

이런 것도 있나요?또는 이와 유사하지만 Future에는 필요하지 않습니다.현재 나는 미래 컬렉션을 반복하면서 하나가 완료되었는지 확인한 다음 잠시 잠을 자고 다시 확인합니다.오랫동안 잠을 자면 원치 않는 지연이 추가되고, 짧은 시간 동안 잠을 자면 성능에 영향을 미칠 수 있기 때문에 이것이 최선의 해결책은 아닌 것 같습니다.

나는 사용해 볼 수 있었다

new CountDownLatch(1)

작업이 완료되면 카운트다운을 줄이고

countdown.await()

, 하지만 미래 창조를 통제할 때만 가능하다는 것을 알았습니다.가능하지만 현재 작업 생성 논리(ExecutorService에 Callable 보내기)가 어느 Future를 기다리는지 결정하는 것과 분리되어 있기 때문에 시스템 재설계가 필요합니다.재정의할 수도 있습니다.

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

작업이 완료될 때 알림을 받을 리스너를 연결하는 기능을 갖춘 RunnableFuture의 사용자 정의 구현을 생성한 다음 해당 리스너를 필요한 작업에 연결하고 CountDownLatch를 사용합니다. 그러나 이는 내가 사용하는 모든 ExecutorService에 대해 newTaskFor를 재정의해야 함을 의미하며 잠재적으로 구현이 있을 것입니다. AbstractExecutorService를 확장하지 않습니다.동일한 목적으로 ExecutorService를 래핑해 볼 수도 있지만 Future를 생성하는 모든 메서드를 장식해야 합니다.

이러한 모든 솔루션은 작동할 수 있지만 매우 부자연스러워 보입니다.다음과 같은 간단한 것이 빠진 것 같습니다.

WaitHandle.WaitAny(WaitHandle[] waitHandles)

C#에서.이러한 종류의 문제에 대한 잘 알려진 해결책이 있습니까?

업데이트:

원래는 Future Creation에 전혀 접근할 수 없었기 때문에 우아한 해결책이 없었습니다.시스템을 재설계한 후 Future 생성에 액세스할 수 있게 되었고 countDownLatch.countdown()을 실행 프로세스에 추가할 수 있게 되었으며, 그런 다음 countDownLatch.await()를 사용할 수 있게 되었고 모든 것이 잘 작동했습니다.다른 답변을 주셔서 감사합니다. 저는 ExecutorCompletionService에 대해 몰랐고 유사한 작업에 실제로 도움이 될 수 있지만 이 특별한 경우에는 일부 Future가 실행자 없이 생성되기 때문에 사용할 수 없습니다. 실제 작업은 네트워크를 통해 다른 서버로 전송됩니다. 원격으로 완료되고 완료 알림이 수신됩니다.

도움이 되었습니까?

해결책

내가 아는 한 Java에는 Java와 유사한 구조가 없습니다. WaitHandle.WaitAny 방법.

내 생각에는 "WaitableFuture" 데코레이터를 통해 이 작업을 수행할 수 있는 것 같습니다.

public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

이는 실행 코드 앞에 삽입할 수 있는 경우에만 작동하지만, 그렇지 않으면 실행 코드에 새 코드가 포함되지 않기 때문입니다. doTask() 방법.하지만 실행 전에 Future 개체에 대한 제어권을 얻을 수 없다면 폴링 없이는 이 작업을 수행할 방법이 없습니다.

또는 미래가 항상 자체 스레드에서 실행되고 어떻게든 해당 스레드를 얻을 수 있는 경우입니다.그런 다음 새 스레드를 생성하여 서로 다른 스레드를 결합한 다음 결합이 반환된 후 대기 메커니즘을 처리할 수 있습니다.이는 정말 보기 흉하고 많은 오버헤드를 유발합니다.그리고 일부 Future 개체가 완료되지 않으면 죽은 스레드에 따라 차단된 스레드가 많이 있을 수 있습니다.주의하지 않으면 메모리와 시스템 리소스가 누출될 수 있습니다.

/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

다른 팁

간단해요, 확인해 보세요 실행자완료서비스.

결과 대기열을 생성하고 대기열에서 기다리면 어떨까요?또는 더 간단하게는 CompletionService를 사용하는 것입니다.ExecutorService + 결과 큐.

실제로 wait()와 informAll()을 사용하면 꽤 쉽습니다.

먼저 잠금 개체를 정의합니다.(이를 위해 어떤 클래스든 사용할 수 있지만 저는 명시적으로 설명하고 싶습니다):

package com.javadude.sample;

public class Lock {}

다음으로 작업자 스레드를 정의합니다.처리가 끝나면 해당 잠금 개체에 알려야 합니다.알림은 잠금 개체에 대한 동기화된 블록 잠금에 있어야 합니다.

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

마지막으로 클라이언트를 작성할 수 있습니다.

package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

어느 스레드가 완료되는지는 신경 쓰지 않으므로 모든 스레드에 대해 단일 WaitHandle을 갖고 기다리면 어떨까요?먼저 끝내는 사람이 핸들을 설정할 수 있습니다.

이 옵션을 참조하세요:

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top