Domanda

I wrote a Link class for passing data between two nodes in a network. I've implemented it with two deques (one for data going from node 0 to node 1, and the other for data going from node 1 to node 0). I'm trying to multithread the application, but I'm getting threadlocks. I'm trying to prevent reading from and writing to the same deque at the same time. In reading more about how I originally implemented this, I think I'm using the condition variables incorrectly (and maybe shouldn't be using the boolean variables?). Should I have two mutexes, one for each deque? Please help if you can. Thanks!

class Link {
public:
// other stuff...
void push_back(int sourceNodeID, Data newData);
void get(int destinationNodeID, std::vector<Data> &linkData);

private:
// other stuff...
std::vector<int> nodeIDs_;
// qVector_ has two deques, one for Data from node 0 to node 1 and 
// one for Data from node 1 to node 0
std::vector<std::deque<Data> > qVector_; 
void initialize(int nodeID0, int nodeID1);

boost::mutex mutex_;
std::vector<boost::shared_ptr<boost::condition_variable> > readingCV_;
std::vector<boost::shared_ptr<boost::condition_variable> > writingCV_;
std::vector<bool> writingData_;
std::vector<bool> readingData_;
};

The push_back function:

void Link::push_back(int sourceNodeID, Data newData)
{
int idx;
if (sourceNodeID == nodeIDs_[0]) idx = 1;
else 
{
    if (sourceNodeID == nodeIDs_[1]) idx = 0;
    else throw runtime_error("Link::push_back: Invalid node ID");
}

boost::unique_lock<boost::mutex> lock(mutex_);
// pause to avoid multithreading collisions
while (readingData_[idx]) readingCV_[idx]->wait(lock);

writingData_[idx] = true;
qVector_[idx].push_back(newData);
writingData_[idx] = false;
writingCV_[idx]->notify_all();
}

The get function:

void Link::get(int destinationNodeID,
std::vector<Data> &linkData)
{
int idx;
if (destinationNodeID == nodeIDs_[0]) idx = 0;
else 
{
    if (destinationNodeID == nodeIDs_[1]) idx = 1;
    else throw runtime_error("Link::get: Invalid node ID");
}

boost::unique_lock<boost::mutex> lock(mutex_);
// pause to avoid multithreading collisions
while (writingData_[idx]) writingCV_[idx]->wait(lock);
readingData_[idx] = true;

std::copy(qVector_[idx].begin(),qVector_[idx].end(),back_inserter(linkData));
qVector_[idx].erase(qVector_[idx].begin(),qVector_[idx].end());
readingData_[idx] = false;
readingCV_[idx]->notify_all();
return;
}

and here's initialize (in case it's helpful)

void Link::initialize(int nodeID0, int nodeID1)
{
readingData_ = std::vector<bool>(2,false);
writingData_ = std::vector<bool>(2,false);
for (int i = 0; i < 2; ++i)
{
    readingCV_.push_back(make_shared<boost::condition_variable>());
    writingCV_.push_back(make_shared<boost::condition_variable>());
}
nodeIDs_.reserve(2);
nodeIDs_.push_back(nodeID0);
nodeIDs_.push_back(nodeID1);
qVector_.reserve(2);
qVector_.push_back(std::deque<Data>());
qVector_.push_back(std::deque<Data>());
}
È stato utile?

Soluzione

I'm trying to multithread the application, but I'm getting threadlocks.

What is a "threadlock"? It's difficult to see what your code is trying to accomplish. Consider, first, your push_back() code, whose synchronized portion looks like this:

boost::unique_lock<boost::mutex> lock(mutex_);

while (readingData_[idx]) readingCV_[idx]->wait(lock);

writingData_[idx] = true;
qVector_[idx].push_back(newData);
writingData_[idx] = false;
writingCV_[idx]->notify_all();

Your writingData[idx] boolean starts off as false, and becomes true only momentarily while a thread has the mutex locked. By the time the mutex is released, it is false again. So for any other thread that has to wait to acquire the mutex, writingData[idx] will never be true.

But in your get() code, you have

boost::unique_lock<boost::mutex> lock(mutex_);
// pause to avoid multithreading collisions
while (writingData_[idx]) writingCV_[idx]->wait(lock);

By the time a thread gets the lock on the mutex, writingData[idx] is back to false and so the while loop (and wait on the CV) is never entered.

An exactly symmetric analysis applies to the readingData[idx] boolean, which also is always false outside the mutex lock.

So your condition variables are never waited on. You need to completely rethink your design.

Start with one mutex per queue (the deque is overkill for simply passing data), and for each queue associate a condition variable with the queue being non-empty. The get() method will thus wait until the queue is non-empty, which will be signalled in the push_back() method. Something like this (untested code):

template <typename Data>
class BasicQueue
{
public:
    void push( Data const& data )
    {
        boost::unique_lock  _lock( mutex_ );
        queue_.push_back( data );
        not_empty_.notify_all();
    }

    void get ( Data& data )
    {
        boost::unique_lock  _lock( mutex_ );
        while ( queue_.size() == 0 )
            not_empty_.wait( _lock ); // this releases the mutex
        // mutex is reacquired here, with queue_.size() > 0
        data = queue_.front();
        queue_.pop_front();         
    }

private:
    std::queue<Data>            queue_;
    boost::mutex                mutex_;
    boost::condition_variable   not_empty_;
};

Altri suggerimenti

Yes. You need two mutexes. Your deadlocks are almost certainly a result of contention on the single mutex. If you break into your running program with a debugger you will see where the threads are hanging. Also I don't see why you would need the bools. (EDIT: It may be possible to come up with a design that uses a single mutex but it's simpler and safer to stick with one mutex per shared data structure)

A rule of thumb would be to have one mutex per shared data structure you are trying to protect. That mutex guards the data structure against concurrent access and provides thread safety. In your case one mutex per deque. E.g.:

class SafeQueue
{
private:
  std::deque<Data> q_;
  boost::mutex m_;
  boost::condition_variable v_;

public:
  void push_back(Data newData)
  {
    boost::lock_guard<boost::mutex> lock(m_);
    q_.push_back(newData);
    // notify etc.
  }
// ...
};

In terms of notification via condition variables see here:

Using condition variable in a producer-consumer situation

So there would also be one condition_variable per object which the producer would notify and the consumer would wait on. Now you can create two of these queues for communicating in both directions. Keep in mind that with only two threads you can still deadlock if both threads are blocked (waiting for data) and both queues are empty.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top