In a simple C++ 11 four-thread program, my two consumer threads are not returning if I comment stdout print lines

StackOverflow https://stackoverflow.com/questions/16133010

I am learning my way through synchronization primitives with C++ 11. I have to write those methods for a template class which is a FIFO queue whose maximum number of elements is declared at construction time.

There are two threads which push items into said queue and two that retrieve them. They are synchronized by using two condition variables to make sure that consumer threads only pop items when the queue is not empty and that producer threads push new items only when the queue is not full. The queue has got a open/close status which is used as an additional condition in the wait() call of both condition variable. When the queue is closed, threads should return without performing any operation.

// main.cpp
#include "stdafx.h"

int _tmain(int argc, _TCHAR* argv[]){
    BlockingQueue<int> bq(10);
    int sum1=0, sum2=0;

    std::thread c1([&bq,&sum1](){
        int i;
        while(bq.get(i)) sum1+=i;
    });
    std::thread c2([&bq,&sum2](){
        int i;
        while(bq.get(i)) sum2+=i;
    });
    std::thread p1([&bq](){
    for(int i=0;i<1000;i+=2) bq.put(i);
    });
    std::thread p2([&bq](){
    for(int i=0;i<1000;i+=2) bq.put(i+1);
    });
    p1.join();
    std::cout<<"p1 thread returned."<<std::endl;
    p2.join();
    std::cout<<"p2 thread returned."<<std::endl;
    bq.close();
    c1.join();
    std::cout<<"c1 thread returned."<<std::endl;
    c2.join();
    std::cout<<"c2 thread returned."<<std::endl;
    std::cout<<"sum1: "<<sum1<<std::endl;
    std::cout<<"sum2: "<<sum2<<std::endl;
    std::cout<<"total: "<<sum1+sum2<<std::endl;
    return 0;
}

Here's the class I've created:

// BlockingQueue.h
#include "stdafx.h"

template<class T> class BlockingQueue
{
    std::mutex t_queue_mutex;
    std::queue<T> t_queue;
    int t_queue_cap_value;
    bool isQueueOpen;
    std::condition_variable put_condition;
    std::condition_variable get_condition;
public:
    BlockingQueue(int N);
    ~BlockingQueue(void);
    bool put(T t_item);
    bool get(T &t_item);
    bool isOpen();
    bool isFull();
    bool isEmpty();
    void close();
};

// BlockinQueue.cpp
#include "BlockingQueue.h"
#include "stdafx.h"

template <class T> BlockingQueue<T>::BlockingQueue(int N)
{
    t_queue_cap_value=N;
    isQueueOpen=true;
    std::cout<<"Rejoice! A bq has been created!"<<std::endl;
}

template <class T> BlockingQueue<T>::~BlockingQueue(void)
{
}

template <class T> bool BlockingQueue<T>::isFull(){
    if(t_queue_cap_value==t_queue.size())
        return true;
    else
        return false;
}

template <class T> bool BlockingQueue<T>::isOpen(){
    return isQueueOpen;
}

template <class T> void BlockingQueue<T>::close(){
    isQueueOpen=false;
}

/* get method */
template <class T> bool BlockingQueue<T>::get(T &t_item){
    bool exitThreadStatus=false;

    if(!isOpen()){
        put_condition.notify_all();
        return false;
    }
    std::unique_lock<std::mutex> ul(t_queue_mutex);
    get_condition.wait(ul, [this](){
        //std::cout<<"Getter thread with get_id()="<<std::this_thread::get_id()<<" is waiting. isOpen()="<<isOpen()<<" and t_queue.empty()="<<t_queue.empty()<<std::endl;
        if(!isOpen())
            return true;
        else
            return !t_queue.empty();
    }); 
    if(isOpen()){
        exitThreadStatus=true;
        t_item=t_queue.front();
        t_queue.pop();
    }
    std::cout<<"Extracted "<<t_item<<". After pop size()="<<t_queue.size()<<std::endl;
    put_condition.notify_all();
    return exitThreadStatus;
}

/* put method */
template <class T> bool BlockingQueue<T>::put(T t_item){
    bool exitThreadStatus=false;

    if(!isOpen()){
        get_condition.notify_all();
        return false;
    }
    std::unique_lock<std::mutex> ul(t_queue_mutex);
    put_condition.wait(ul, [this](){
        if(!isOpen())
            return true;
        else
            return !isFull();
    }); 
        if(isOpen()){
        exitThreadStatus=true;
        t_queue.push(t_item);
    }
    std::cout<<"Inserting "<<t_item<<". After push size()="<<t_queue.size()<<std::endl;
    get_condition.notify_all();
    return exitThreadStatus;
}

template class BlockingQueue<int>;

It seems to be working correctly whenever I leave the two std::cout lines in get() and put() uncommented, getting the following output (as expected):

Inserting 998. After push size()=2
Extracted 997. After pop size()=1
p1 thread returned.
Inserting 999. After push size()=2
Extracted 998. After pop size()=1
p2 thread returned.
Extracted 999. After pop size()=0
Extracted 998. After pop size()=0
c1 thread returned.
c2 thread returned.
sum1: 250000
sum2: 249500
total: 499500

If I instead comment the cout lines, the two collecting threads never seem to return and I can't understand what is wrong with my code. Does anyone have a clue? Thank you!

Output with commented cout lines:

Rejoice! A bq has been created!
p1 thread returned.
p2 thread returned.
有帮助吗?

解决方案

Try adding get_condition.notify_all() and put_condition.notify_all() to close().

As far as I can tell, if a thread is in get_condition.wait() inside get() when close() gets called, it stays in that wait forever. Why this works with the cout statements I have no idea, though the documentation does mention "spurious wakeups".

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top