Question

I have a class that implements a threaded producer/consumer system using a mutex and two condition variables for synchronization. The producer signals the consumer thread when there are items to use, and the consumer signals the producer thread when it has consumed the items. The threads continue producing and consuming until the destructor requests them to quit by setting a boolean variable. Because either of the threads may be waiting on a condition variable, I have to implement a second check of the quit variable, which feels wrong and messy...

I've reduced the problem down to the following (working on GNU/Linux with g++4.7) example:

// C++11and Boost required.
#include <cstdlib> // std::rand()
#include <cassert>

#include <boost/circular_buffer.hpp>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

// Creates a single producer and single consumer thread.
class prosumer
{
    public:
        // Create the circular buffer and start the producer and consumer thread.
        prosumer()
            : quit_{ false }
            , buffer_{ circular_buffer_capacity }
            , producer_{ &prosumer::producer_func, this }
            , consumer_{ &prosumer::consumer_func, this }
        {}

        // Set the quit flag and wait for the threads to exit.
        ~prosumer()
        {
            quit_ = true;
            producer_.join();
            consumer_.join();
        }

    private:
        // Thread entry point for the producer.
        void producer_func()
        {
            // Value to add to the ringbuffer to simulate data.
            int counter = 0;

            while ( quit_ == false )
            {
                // Simulate the production of some data.
                std::vector< int > produced_items;
                const auto items_to_produce = std::rand() % circular_buffer_capacity;
                for ( int i = 0; i < items_to_produce; ++i )
                {
                    produced_items.push_back( ++counter );
                }

                // Get a lock on the circular buffer.
                std::unique_lock< std::mutex > lock( buffer_lock_ );

                // Wait for the buffer to be emptied or the quit flag to be set.
                buffer_is_empty_.wait( lock, [this]()
                        {
                            return buffer_.empty() == true || quit_ != false;
                        } );

                // Check if the thread was requested to quit.
                if ( quit_ != false )
                {
                    // Don't let the consumer deadlock.
                    buffer_has_data_.notify_one();
                    break;
                }

                // The buffer is locked by this thread. Put the data into it.
                buffer_.insert( std::end( buffer_ ), std::begin( produced_items ), std::end( produced_items ) );

                // Notify the consumer that the buffer has some data in it.
                buffer_has_data_.notify_one();
            }
            std::cout << "producer thread quit\n";
        }


        // Thread entry for the consumer.
        void consumer_func()
        {
            int counter_check = 0;

            while ( quit_ == false )
            {
                std::unique_lock< std::mutex > lock( buffer_lock_ );

                // Wait for the buffer to have some data before trying to read from it.
                buffer_has_data_.wait( lock, [this]()
                        {
                            return buffer_.empty() == false || quit_ != false;
                        } );

                // Check if the thread was requested to quit.
                if ( quit_ != false )
                {
                    // Don't let the producer deadlock.
                    buffer_is_empty_.notify_one();
                    break;
                }

                // The buffer is locked by this thread. Simulate consuming the data.
                for ( auto i : buffer_ ) assert( i == ++counter_check );
                buffer_.clear();

                // Notify the producer thread that the buffer is empty.
                buffer_is_empty_.notify_one();
            }
            std::cout << "consumer thread quit\n";
        }

        // How many items the circular buffer can hold. 
        static const int circular_buffer_capacity = 64;

        // Flag set in the destructor to signal the threads to stop.
        std::atomic_bool quit_;

        // Circular buffer to hold items and a mutex for synchronization.
        std::mutex buffer_lock_;
        boost::circular_buffer< int > buffer_;

        // Condition variables for the threads to signal each other.
        std::condition_variable buffer_has_data_;
        std::condition_variable buffer_is_empty_;

        std::thread producer_;
        std::thread consumer_;
};


int main( int argc, char **argv )
{
    (void)argc; (void) argv;

    prosumer test;

    // Let the prosumer work for a little while.
    std::this_thread::sleep_for( std::chrono::seconds( 3 ) );

    return EXIT_SUCCESS;
}

If you look at the producer_func and consumer_func thread functions you can see that they loop until the quit variable is set by the prosumer destructor, but they also check for the quit variable again after they lock the circular buffer. If the quit variable was set, they signal each other to prevent a deadlock.

Another idea I had was to call notify_one() on the condition variables from the destructor, would that be a better solution?

Is there a better way to do this?

Update 1: I forgot to mention that in this instance, when the threads are requested to exit, the consumer does not need to consume any remaining data in the circular buffer and it's fine if the producer produces a little bit more too. As long as they both exit and don't deadlock all will be well.

Was it helpful?

Solution

In my opinion, calling notify_one (or rather notify_all if you were to extend your buffer to multiple producers/consumers) on both condition variables in the destructor before the calls to join would be the preferred solution for several reasons:

Firstly, this matches the way that conditional variables are typically used: By setting quit_, you change the state that the producer/consumer threads are interested in and wait for, so you should notify them of the state change.

Furthermore, notify_one should not be a very costly operation.

Also, in a more realistic application, it could be the case that in between the production of two elements, there is a delay; in that case you may not want to block in your destructor until the consumer notices it has to cancel as soon as the next element is enqueued; in the example code, that does not occur, as far as I can see.

OTHER TIPS

In my opinion, there are two functionalities that can be separated:

  1. message passing and dispatching
  2. producing and consuming

It does make sense to really separate them: the 'worker' thread does nothing more than process 'messages' that could mean 'quit' or 'do_work'.

This way you can create a generic 'worker' class that aggregates the actual function. The produce and consume methods stay clean, and the worker class care only about keeping the work going.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top