문제

일부 작업 개체를 생성한 다음 스레드에 추가하는 단일 스레드 생산자가 있습니다. ArrayBlockingQueue (고정된 크기입니다).

또한 다중 스레드 소비자를 시작합니다.이는 고정 스레드 풀(Executors.newFixedThreadPool(threadCount);).그런 다음 일부 ConsumerWorker 인스턴스를 이 threadPool에 제출합니다. 각 ConsumerWorker는 위에서 언급한 ArrayBlockingQueue 인스턴스에 대한 참조를 갖습니다.

그러한 각 근로자는 다음을 수행합니다. take() 대기열에 서서 작업을 처리합니다.

내 문제는 더 이상 수행할 작업이 없을 때 작업자에게 알릴 수 있는 가장 좋은 방법이 무엇인지입니다.즉, 생산자가 대기열에 추가를 마쳤음을 작업자에게 어떻게 알릴 수 있으며, 이 시점부터 각 작업자는 대기열이 비어 있음을 확인하면 중지해야 합니다.

지금 내가 얻은 것은 내 프로듀서가 작업(큐에 항목 추가)을 완료할 때 트리거되는 콜백으로 초기화되는 설정입니다.또한 내가 생성하여 ThreadPool에 제출한 모든 ConsumerWorker의 목록을 보관합니다.생산자 콜백이 생산자가 완료되었음을 알려주면 이를 각 작업자에게 알릴 수 있습니다.이 시점에서는 큐가 비어 있지 않은지 계속 확인해야 하며, 큐가 비어 있으면 중지해야 하므로 ExecutorService 스레드 풀을 정상적으로 종료할 수 있습니다.이런 것이에요

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

여기서 문제는 때때로 생산자가 완료하고 신호를 보내고 ConsumerWorkers가 대기열의 모든 것을 소비하기 전에 중지하는 명백한 경쟁 조건이 있다는 것입니다.

내 질문은 모든 것이 제대로 작동하도록 동기화하는 가장 좋은 방법은 무엇입니까?생산자가 실행 중인지 확인하고 큐가 비어 있는지 확인하고 한 블록(큐 개체의)에 있는 큐에서 항목을 가져오는 전체 부분을 동기화해야 합니까?업데이트를 동기화해야 할까요? isRunning ConsumerWorker 인스턴스의 부울?다른 제안이 있나요?

업데이트, 내가 사용한 작업 구현은 다음과 같습니다.

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

이것은 아래 JohnVint의 답변에서 약간의 수정을 거쳐 크게 영감을 받았습니다.

=== @vendhan의 의견으로 인해 업데이트되었습니다.

관찰해 주셔서 감사합니다.당신 말이 맞습니다. 이 질문의 첫 번째 코드 조각에는 (다른 문제 중에서도) while(isRunning || !inputQueue.isEmpty()) 정말 말이 안 돼요.

이것의 실제 최종 구현에서 나는 "||"교체를위한 당신의 제안에 더 가까운 일을합니다. (또는) "&&"(및)와 함께, 각 작업자 (소비자)는 이제 목록에서 얻은 요소가 독약인지 여부를 확인하고, 그렇다면 (이론적으로 우리는 근로자가 가지고 있다고 말할 수 있습니다. 실행 중이며 대기열이 비어 있지 않아야합니다).

도움이 되었습니까?

해결책

큐에서 take()를 계속해야합니다.노동자에게 정지 할 것을 말하기 위해 독약을 사용할 수 있습니다.예 :

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}
.

다른 팁

나는 직원들에게 작업을 종료해야 한다는 신호를 보내기 위해 특별 작업 패킷을 보냅니다.

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

작업자를 중지하려면 간단히 추가하십시오. ConsumerWorker.DONE 대기열에.

큐에서 요소를 재개주하려고 시도하는 코드 블록에서 poll(time,unit) 대신 take()를 사용하십시오.

try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 
.

타임 아웃의 적절한 값을 지정하면 스레드가 불행한 시퀀스가있는 경우 스레드가 계속 차단되지 않도록합니다.

  1. isRunning가 true
  2. 큐가 비어 있으므로 스레드가 차단 된 대기를 입력합니다 (take() 사용
  3. isRunning가 false 로 설정됩니다.

우리는 그것을 사용하여 그것을 할 수 없나요? CountDownLatch, 여기서 크기는 생산자의 레코드 수입니다.그리고 모든 소비자는 그럴 것이다. countDown 기록을 처리한 후.그리고 그것은 교차합니다 awaits() 모든 작업이 완료되면 메소드.그럼 모든 소비자를 중지하십시오.모든 기록이 처리되기 때문입니다.

사용할 수있는 전략이 많이 있지만 하나의 간단한 것은 작업의 끝을 알리는 작업의 하위 클래스를 갖는 것입니다.제작자는이 신호를 직접 보내지 않습니다.대신이 작업 서브 클래스의 인스턴스를 큐에 넣습니다.소비자 중 하나 가이 작업을 해제하고 실행하면 신호가 전송됩니다.

멀티 스레드 생산자와 멀티 스레드 소비자를 사용해야했습니다. Scheduler -- N Producers -- M Consumers 구성표로 끝 났으며 각 두 대기열을 통해 통신합니다 (총 2 개의 대기열).스케줄러는 첫 번째 대기열을 요청하여 데이터를 생성 한 다음 N "독약 알약"으로 채 웁니다.활성 생산자 (원자 int)의 카운터가 있으며 마지막 독약 알약을받는 마지막 프로듀서는 M 독약 알약을 소비자 대기열에 보냅니다.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top