Question

Below is a very simple Producer/Consumer problem example using a thread safe unbounded queue. Can anyone shed a little light on why this code behaves correctly when compiled with GNU C++ and yet the Consumer thread randomly gives up when compiled with LLVM C++?

#include <iostream>
#include <queue>
#include <math.h>
#include <time.h>
#include "boost/thread/condition_variable.hpp"
#include "boost/thread.hpp"

//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
    std::queue<Data> the_queue;
    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
        lock.unlock();
        printf("\n...just pushed, waking a thread...\n\n");
        the_condition_variable.notify_one();
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            printf("\n...buffer empty, waiting to pop...\n\n");
            the_condition_variable.wait(lock);
        }

        popped_value=the_queue.front();
        the_queue.pop();
    }

    int len() {
        boost::mutex::scoped_lock lock(the_mutex);
        return (int)the_queue.size();
    }

};

//
// PRODUCER
//    
class Producer {
private:
    Concurrent_Queue<int> *buff;
    int next;
public:
    Producer(Concurrent_Queue<int> *q): buff(q) { printf("Prod up!\n"); }
    ~Producer() {}
    void run() {
        int wait_time = 0;
        while(1) {
            wait_time = (rand()%5)+1;
            sleep(wait_time);
            printf("wait_time: %d\n", wait_time);
            buff->push(wait_time);
            printf("buffer_len: %d\n", buff->len());
        }
    }
};

//
// CONSUMER 
//    
class Consumer {
private:
    Concurrent_Queue<int> * buff;
public:
    Consumer(Concurrent_Queue<int> *q): buff(q) { printf("Con up!\n"); }
    ~Consumer() {}
        void run() {
        unsigned wait_time = 0;
        int latest = 0;
        while(1) {
            wait_time = (rand()%7)+1;
            sleep(wait_time);
            buff->wait_and_pop(latest);
            printf("latest consumed int: %d\n", latest);
            printf("cons buff_len: %d\n", buff->len());
        }
    }
};

//
//  MAIN
//
int main(int argc, const char * argv[])
{
    srand((unsigned)time(NULL));
    Concurrent_Queue<int> Con_Q;
    Consumer taker(&Con_Q);
//  sleep(3);
    Producer giver(&Con_Q);
    boost::thread* prod_thread = new boost::thread(boost::bind(&Producer::run, &giver));
    boost::thread* cons_thread = new boost::thread(boost::bind(&Consumer::run, &taker));

    prod_thread->join();
    cons_thread->join();
}
Was it helpful?

Solution

You should move the notify calls under the mutex.

This is documented somewhere in the pthreads(7) manpages. I'll try to find it.

Update the most relevant quote I'm able to find at this time is:

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

The pthread_cond_broadcast() and pthread_cond_signal() functions shall have no effect if there are no threads currently blocked on cond.

I know that thread checking tools like Helgrind complain if a condition is signaled outside of the lock.

Side notes:

  • I happen to have written a thread_pool with a task queue the other day, which also supports shutdown. You can try whether this suffers the symptoms on your Mac:

  • bool empty() const is not really useful, because it is a racey call. It would only be thread-safe if it transferred the lock to the caller

  • int len() const has the same issue
  • You can use the predicated version of cv::wait() to get cleaner code:

    void wait_and_pop(Data& popped_value)
    {
        namespace phx = boost::phoenix;
    
        boost::unique_lock<boost::mutex> lock(the_mutex);
    
        //if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");
    
        the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));
    
        popped_value = the_queue.front();
        the_queue.pop();
    }
    
  • I'd prefer to use the c++11-similar interfaces (unique_lock<> over mutex::scoped_lock) so it's easier to switch.

  • the producer had an unused field next - I removed it

Here's my version with tiny modifications, so you can copy/paste to check on MacOS (I don't have a Mac):

#include <iostream>
#include <queue>
#include "boost/thread.hpp"
#include "boost/phoenix.hpp"

//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
    typedef std::queue<Data> queue_t;
    queue_t the_queue;

    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        the_queue.push(data);

        printf("\n...just pushed, waking a thread...\n\n");
        the_condition_variable.notify_one();
    }

#ifdef UNUSED_CODE
    bool empty() const
    {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }
#endif

    void wait_and_pop(Data& popped_value)
    {
        namespace phx = boost::phoenix;

        boost::unique_lock<boost::mutex> lock(the_mutex);

        //if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");

        the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));

        popped_value = the_queue.front();
        the_queue.pop();
    }

    std::size_t len() {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        return the_queue.size();
    }

};

//
// PRODUCER
//    
class Producer {
private:
    Concurrent_Queue<int> &buff;
public:
    Producer(Concurrent_Queue<int> &q): buff(q) { printf("Prod up!\n"); }
    ~Producer() {}
    void run() {
        int wait_time = 0;
        while(1) {
            wait_time = (rand()%5)+1;
            boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
            printf("wait_time: %d\n", wait_time);
            buff.push(wait_time);
            printf("buffer_len: %lu\n", buff.len());
        }
    }
};

//
// CONSUMER 
//    
class Consumer {
private:
    Concurrent_Queue<int> & buff;
public:
    Consumer(Concurrent_Queue<int> &q): buff(q) { printf("Con up!\n"); }
    ~Consumer() {}
        void run() {
        unsigned wait_time = 0;
        int latest = 0;
        while(1) {
            wait_time = (rand()%7)+1;
            boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
            buff.wait_and_pop(latest);
            printf("latest consumed int: %d\n", latest);
            printf("cons buff_len: %lu\n", buff.len());
        }
    }
};

//
//  MAIN
//
int main()
{
    srand((unsigned)time(NULL));
    Concurrent_Queue<int> Con_Q;

    Consumer taker(Con_Q);
    //boost::this_thread::sleep_for(boost::chrono::seconds(3));
    Producer giver(Con_Q);

    boost::thread_group group;
    group.create_thread(boost::bind(&Producer::run, &giver));
    group.create_thread(boost::bind(&Consumer::run, &taker));

    group.join_all();
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top