Domanda

I am trying to understand the basic multithreading mechanisms in the new C++ 11 standard. The most basic example I can think of is the following:

  • A producer and a consumer are implemented in separate threads
  • The producer places a certain amount of items inside a queue
  • The consumer takes items from the queue if there are any present

This example is also used in many school books about multithreading and everything about the communication process works fine. However, I have a problem when it comes to stopping the consumer thread.

I want the consumer to run until it gets an explicit stop signal (in most cases this means that I wait for the producer to finish so I can stop the consumer before the program is ended). Unfortunately C++ 11 threads lack an interrupt mechanism (which I know from multithreading in Java for example). Thus, I have to use flags like isRunning to signal that I want a thread to stop.

The main problem now is: After I have stopped the producer thread, the queue is empty and the consumer is waiting on a condition_variable to get a signal when the queue is filled again. So I need to wake the thread up by calling notify_all() on the variable before exiting.

I have found a working solution, but it seems somehow messy. The example code is listed below (I am sorry but somehow I couldn't reduce the code size any furhter for a "minimal" minimal example):

The Queue class:

class Queue{
public:
    Queue() : m_isProgramStopped{ false } { }

    void push(int i){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_q.push(i);
        m_cond.notify_one();
    }

    int pop(){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });

        if (m_isProgramStopped){
            throw std::exception("Program stopped!");
        }

        int x = m_q.front();
        m_q.pop();

        std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
        return x;
    }

    void stop(){
        m_isProgramStopped = true;
        m_cond.notify_all();
    }

private:
    std::queue<int> m_q;
    std::mutex m_mtx;
    std::condition_variable m_cond;
    bool m_isProgramStopped;
};

The Producer:

class Producer{
public:
    Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }

    void produce(){
        for (int i = 0; i < 5; i++){
            m_q.push(m_counter++);
            std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
        }
    }

    void execute(){
        m_t = std::thread(&Producer::produce, this);
    }

    void join(){
        m_t.join();
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_counter;
};

The Consumer:

class Consumer{
public:
    Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
    { }

    ~Consumer(){
        std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
    }

    void consume(){
        while (m_isRunning){
            try{
                m_q.pop();
                m_takeCounter++;
            }
            catch (std::exception e){
                std::cout << "Program was stopped while waiting." << std::endl;
            }
        }
    }

    void execute(){
        m_t = std::thread(&Consumer::consume, this);
    }

    void join(){
        m_t.join();
    }

    void stop(){
        m_isRunning = false;
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_takeCounter;
    bool m_isRunning;
};

And finally the main():

int main(void){
    Queue q;

    Consumer cons{ q };
    Producer prod{ q };

    cons.execute();
    prod.execute();

    prod.join();

    cons.stop();
    q.stop();

    cons.join();

    std::cout << "END" << std::endl;

    return EXIT_SUCCESS;
}

Is this the right way to end a thread that is waiting an a condition variable or are there better methods? Currently, the queue needs to know if the program has stopped (which in my opinion destroys the loose coupling of the components) and I need to call stop() on the queue explicitly which doesn't seem right.

Additionaly, the condition variable which should just be used as a singal if the queue is empty now stands for another condition - if the program has ended. If I am not mistaken, every time a thread waits on a condition variable for some event to happen, it would also have to check if the thread has to be stopped before continuing its execution (which also seems wrong).

Do I have these problems because my entire design is faulty or am I missing some mechanisms that can be used to exit threads in a clean way?

È stato utile?

Soluzione

No, there's nothing wrong with your design, and it's the normal approach taken for this sort of problem.

It's perfectly valid for you to have multiple conditions (eg anything on queue or program stopping) attached to a condition variable. The key thing is that the bits in the condition are checked for when the wait returns.

Instead of having a flag in Queue to indicate that the program is stopping you should think of the flag as "can I accept". This is a better overall paradigm and works better in a multi-threaded environment.

Also, instead of having pop throw an exception if someone calls it and stop has been called you could replace the method with bool try_pop(int &value) which will return true if a value was returned, otherwise false. This way the caller can check on failure to see if the queue has been stopped (add a bool is_stopped() const method). Although exception handling works here it's a bit heavy handed and isn't really an exceptional case in a multi-threaded program.

Altri suggerimenti

wait can be called with a timeout. Control is returned to the thread and stop could be checked. Depending on that value it can wait on more items to be consumed or finish execution. A good introduction to multithreading with c++ is C++11 Concurrency .

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top