You should move the notify calls under the mutex.
This is documented somewhere in the pthreads(7) manpages. I'll try to find it.
Update the most relevant quote I'm able to find at this time is:
The
pthread_cond_broadcast()
orpthread_cond_signal()
functions may be called by a thread whether or not it currently owns the mutex that threads callingpthread_cond_wait()
orpthread_cond_timedwait()
have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread callingpthread_cond_broadcast()
orpthread_cond_signal()
.The
pthread_cond_broadcast()
andpthread_cond_signal()
functions shall have no effect if there are no threads currently blocked on cond.
I know that thread checking tools like Helgrind complain if a condition is signaled outside of the lock.
Side notes:
I happen to have written a thread_pool with a task queue the other day, which also supports shutdown. You can try whether this suffers the symptoms on your Mac:
bool empty() const
is not really useful, because it is a racey call. It would only be thread-safe if it transferred the lock to the callerint len() const
has the same issueYou can use the predicated version of
cv::wait()
to get cleaner code:void wait_and_pop(Data& popped_value) { namespace phx = boost::phoenix; boost::unique_lock<boost::mutex> lock(the_mutex); //if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n"); the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue))); popped_value = the_queue.front(); the_queue.pop(); }
I'd prefer to use the c++11-similar interfaces (
unique_lock<>
overmutex::scoped_lock
) so it's easier to switch.- the producer had an unused field
next
- I removed it
Here's my version with tiny modifications, so you can copy/paste to check on MacOS (I don't have a Mac):
#include <iostream>
#include <queue>
#include "boost/thread.hpp"
#include "boost/phoenix.hpp"
//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
typedef std::queue<Data> queue_t;
queue_t the_queue;
mutable boost::mutex the_mutex;
boost::condition_variable the_condition_variable;
public:
void push(Data const& data)
{
boost::lock_guard<boost::mutex> lock(the_mutex);
the_queue.push(data);
printf("\n...just pushed, waking a thread...\n\n");
the_condition_variable.notify_one();
}
#ifdef UNUSED_CODE
bool empty() const
{
boost::lock_guard<boost::mutex> lock(the_mutex);
return the_queue.empty();
}
bool try_pop(Data& popped_value)
{
boost::lock_guard<boost::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
#endif
void wait_and_pop(Data& popped_value)
{
namespace phx = boost::phoenix;
boost::unique_lock<boost::mutex> lock(the_mutex);
//if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");
the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));
popped_value = the_queue.front();
the_queue.pop();
}
std::size_t len() {
boost::lock_guard<boost::mutex> lock(the_mutex);
return the_queue.size();
}
};
//
// PRODUCER
//
class Producer {
private:
Concurrent_Queue<int> &buff;
public:
Producer(Concurrent_Queue<int> &q): buff(q) { printf("Prod up!\n"); }
~Producer() {}
void run() {
int wait_time = 0;
while(1) {
wait_time = (rand()%5)+1;
boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
printf("wait_time: %d\n", wait_time);
buff.push(wait_time);
printf("buffer_len: %lu\n", buff.len());
}
}
};
//
// CONSUMER
//
class Consumer {
private:
Concurrent_Queue<int> & buff;
public:
Consumer(Concurrent_Queue<int> &q): buff(q) { printf("Con up!\n"); }
~Consumer() {}
void run() {
unsigned wait_time = 0;
int latest = 0;
while(1) {
wait_time = (rand()%7)+1;
boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
buff.wait_and_pop(latest);
printf("latest consumed int: %d\n", latest);
printf("cons buff_len: %lu\n", buff.len());
}
}
};
//
// MAIN
//
int main()
{
srand((unsigned)time(NULL));
Concurrent_Queue<int> Con_Q;
Consumer taker(Con_Q);
//boost::this_thread::sleep_for(boost::chrono::seconds(3));
Producer giver(Con_Q);
boost::thread_group group;
group.create_thread(boost::bind(&Producer::run, &giver));
group.create_thread(boost::bind(&Consumer::run, &taker));
group.join_all();
}