Pergunta

I was trying to write code for Producer-Consumer problem. Below code works fine most of the time but stuck sometimes because of "Lost Wake-up" (i guess). I tried thread sleep() but it didn't work. What modification is needed to handle this case in my code? Is semaphore can be helpful here ? If yes, how will i implement them here ?

#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <iostream>

using namespace std;

int product = 0;
boost::mutex mutex;
boost::condition_variable cv;
boost::condition_variable pv;
bool done = false;

void consumer(){
    while(done==false){
        //cout << "start c" << endl
        boost::mutex::scoped_lock lock(mutex);
        cv.wait(lock);
        //cout << "wakeup c" << endl;
        if (done==false)
        {
            cout << product << endl;
            //cout << "notify c" << endl;
            pv.notify_one();
        }
        //cout << "end c" << endl;
    }
}

void producer(){
    for(int i=0;i<10;i++){
        //cout << "start p" << endl;
        boost::mutex::scoped_lock lock(mutex);
        boost::this_thread::sleep(boost::posix_time::microseconds(50000));
        ++product;
        //cout << "notify p" << endl;
        cv.notify_one();
        pv.wait(lock);
        //cout << "wakeup p" << endl;
    }
    //cout << "end p" << endl;
    cv.notify_one();
    done = true;
}

int main()
{
    int t = 1000;
    while(t--){
        /*
        This is not perfect, and is prone to a subtle issue called the lost wakeup (for example, producer calls notify() 
        on the condition, but client hasn't really called wait() yet, then both will wait() indefinitely.) 
        */
        boost::thread consumerThread(&consumer);    
        boost::thread producerThread(&producer);

        producerThread.join();
        consumerThread.join();
        done =false;
        //cout << "process end" << endl;
    }
    cout << "done" << endl;
    getchar();
    return 0;
}
Foi útil?

Solução

Yes, you want a way to know (in the consumer) that you "missed" a signal. A semaphore can help. There's more than one way to skin a cat, so here's my simple take on it (using just c++11 standard library features):

class semaphore
{
private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;

public: 
    semaphore(int count_ = 0) : count(count_) { }

    void notify()
    {
        std::unique_lock<std::mutex> lck(mtx);
        ++count;
        cv.notify_one();
    }

    void wait() { return wait([]{}); }  // no-op action

    template <typename F>
    auto wait(F&& func = []{}) -> decltype(std::declval<F>()())
    {
        std::unique_lock<std::mutex> lck(mtx);

        while(count == 0){
            cv.wait(lck);
        }
        count--;

        return func();
    }
};

For convenience, I added a convenience wait() overload that takes a function to be executed under the lock. This makes it possible for the consumer to operate the 'semaphore' without ever manually operating the lock (and still get the value of product without data-races):

semaphore sem;

void consumer() {
    do {
        bool stop = false;
        int received_product = sem.wait([&stop] { stop = done; return product; });

        if (stop)
            break;

        std::cout << received_product << std::endl;

        std::unique_lock<std::mutex> lock(processed_mutex);
        processed_signal.notify_one();
    } while(true);
}

A fully working demo: Live on Coliru:

#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <cassert>

class semaphore
{
private:
    std::mutex mtx;
    std::condition_variable cv;
    int count;

public: 
    semaphore(int count_ = 0) : count(count_) { }

    void notify()
    {
        std::unique_lock<std::mutex> lck(mtx);
        ++count;
        cv.notify_one();
    }

    void wait() { return wait([]{}); }  // no-op action

    template <typename F>
    auto wait(F&& func = []{}) -> decltype(std::declval<F>()())
    {
        std::unique_lock<std::mutex> lck(mtx);

        while(count == 0){
            cv.wait(lck);
        }
        count--;

        return func();
    }
};

semaphore sem;

int product = 0;
std::mutex processed_mutex;
std::condition_variable processed_signal;

bool done = false;

void consumer(int check) {
    do {
        bool stop = false;
        int received_product = sem.wait([&stop] { stop = done; return product; });

        if (stop)
            break;

        std::cout << received_product << std::endl;
        assert(++check == received_product);

        std::unique_lock<std::mutex> lock(processed_mutex);
        processed_signal.notify_one();
    } while(true);
}

void producer() {
    std::unique_lock<std::mutex> lock(processed_mutex);
    for(int i = 0; i < 10; ++i) {
        ++product;
        sem.notify();
        processed_signal.wait(lock);
    }
    done = true;
    sem.notify();
}

int main() {
    int t = 1000;
    while(t--) {
        std::thread consumerThread(&consumer, product);
        std::thread producerThread(&producer);
        producerThread.join();
        consumerThread.join();
        done = false;
        std::cout << "process end" << std::endl;
    }
    std::cout << "done" << std::endl;
}

Outras dicas

You seems to ignore that the variable done is also a shared state, to the same extend as product. Which can lead to several races conditions. In your case, I see at least one scenario where consumerThread make no progress:

  1. The loop execute has intended
  2. consumer executes, and is waiting at cv.wait(lock);
  3. producer has finished the for loop, and notify consumer and is preempted
  4. consumer wakes up, read "done==false", output product, read done == false again, wait on the condition
  5. producer set done to true and exit
  6. consumer is stuck forever

To avoid these kind of issues you should be holding a lock when reading or writing done. Btw your implementation is quite sequential, ie the producer and the consumer can only process a single piece of data at the time...

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top