Question

Conditional variables use a mutex and the .wait() function unlocks the mutex so another thread can access the shared data. When the condition variable is notified it tries to lock the mutex again to use the shared data.

This pattern is used in the following concurrent_queue example from Anthony Williams:

template<typename Data>
class concurrent_queue
{
private:
    boost::condition_variable the_condition_variable;
public:
    void wait_for_data()
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }
    }
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        bool const was_empty=the_queue.empty();
        the_queue.push(data);
        if(was_empty)
        {
            the_condition_variable.notify_one();
        }
    }
};

Since the code uses std::queue it's clear that the mutex has to be locked when accessing the queue. But let's say instead of std::queue one uses Microsofts Concurrency::concurrent_queue from PPL. Member functions like empty, push and try_pop are thread safe. Do I still need to lock a mutex in this case or can the condition variable be used like this, without creating any possible race conditions.

My code (that seems to work, but what does that mean in multithreading?) looks like this. I have one producer that pushes items into Microsofts concurrent_queue and one background thread that waits for new items in this queue.

The consumer/background thread:

while(runFlag) //atomic
{
    while(the_queue.empty() && runFlag) //wait only when thread should still run
    {
        boost::mutex mtx; //local mutex thats locked afterwards. awkward.
        boost::mutex::scoped_lock lock(mtx);
        condition.wait(lock);
    }

    Data d;
    while(!the_queue.empty() && the_queue.try_pop(d))
    {
       //process data
    }
}

The producer/main thread:

const bool was_empty = the_queue.empty();
Data d; 
the_queue.push(d);
if(was_empty) cond_var.notify_one();

The shutdown procedure:

bool expected_run_state = true;
if(runLoop.compare_exchange_strong(expected_run_state, false))
{
    //atomically set our loop flag to false and 
    //notify all clients of the queue to wake up and exit
    cond_var.notify_all();
}

As said above this code seems to work but that doesn't necessarily mean it's correct. Especially the local mutex that is only used because the condition variable interface forces me to use a mutex, seems like a very bad idea. I wanted to use condition variables since the time between data items added to the queue hard to predict and I would have to create to sleep and wake up periodically like this:

if(the_queue.empty()) Sleep(short_amount_of_time);

Are there any other, maybe OS (in my case: Windows) specific tools, that make a background thread sleep until some condition is met without regularly waking up and checking the condition?

Was it helpful?

Solution

The code is not correct in different scenarios, for example. If the queue has a single element when const bool was_empty = the_queue.empty(); is evaluated, but a thread consumes the element and a different thread tries to consume and waits on the condition, the writer will not notify that thread after inserting the element in the queue.

The key issue is that the fact that all of the operations in an interface are thread safe does not necessarily mean that your use of the interface is safe. If you depend on multiple operations being performed atomically, you need to provide synchronization mechanisms externally.

OTHER TIPS

Are there any other, maybe OS (in my case: Windows) specific tools, that make a background thread sleep until some condition is met without regularly waking up and checking the condition?

This is exactly what Events are for

But if you are targeting only Windows platform (Vista+) you should check out
Slim Reader/Writer (SRW) Locks

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top