I am currently working on a problem that simulates a extended Producer-Worker model. In this problem there are 3 workers and 3 tools available, and for workers to work they need 2 tools (and materials but those are irrelevant). If there are >=2 tools in the vault, a worker will take 2. Else, they will wait on a condition variable that will be signaled when there are >=2.

This is fine with 2 workers: one will work then return the tools to the vault, and the other waiting worker will be awaken and take 2 tools. The problem is that, with 3 workers, there will always be one starving to get the tools.

After some testing I've noticed that threads waiting for a condition variable is structured in stack form. Is there anyway possible to make it queued form? (1 waits, 2 waits, and 3 waits. when 1 is awaken and wants to make another, he has to wait behind 2 and 3.)

Here is one sample output. The code is too long so I'll post it if it's really necessary. There are 3 worker threads and 1 tool mutex. Whoever is starving differs every other run.

1 Tools taken. Remaining: 1
2 Waiting on tools...
3 Waiting on tools...
1 Operator Product made. Tools returned. Tools now:3
3 Tools taken. Remaining: 1
1 Waiting on tools...
3 Materials returned for switch.
3 Operator Product made. Tools returned. Tools now:3
1 Tools taken. Remaining: 1
3 Waiting on tools...
1 Materials returned for switch.
1 Operator Product made. Tools returned. Tools now:3
3 Tools taken. Remaining: 1
1 Waiting on tools...
3 Materials returned for switch.
3 Operator Product made. Tools returned. Tools now:3
1 Tools taken. Remaining: 1
3 Waiting on tools...
1 Materials returned for switch.
1 Operator Product made. Tools returned. Tools now:3
3 Tools taken. Remaining: 1
1 Waiting on tools...
3 Materials returned for switch.
3 Operator Product made. Tools returned. Tools now:3
1 Tools taken. Remaining: 1
3 Waiting on tools...
1 Materials returned for switch.
...

(As you can see 2 never gets the tools...)

Update: 2013/07/05 I have added some code.

int tools = 3; //global
string last; //current last product on output buffer
mutex toolsMutex;
mutex matSearchMutex;

int main(){
//Initializing Producers
    Producer prod1(1);
    Producer prod2(2);
        Producer prod3(3);



    thread p1(processor,1);
    thread p2(processor,2);
    thread p3(processor,3);

    p1.detach();
    p2.detach();
    p3.detach();

    while(true){//forever running

    }

    return 0;
}

Processor:

  //Processor method
void processor(int i){
    srand(time(NULL)); 

    while (true){ //forever running


    bool hasTools = false;
    bool productMade = false;
    while (productMade == false){ //while product has yet to be made.
        //choose what to make...



        if (hasTools == false){
            thread matT(getMaterials,whatToMake);
            thread toolT(getTools,i);
            toolT.join();           
            matT.join();
            hasTools = true;
        }
        else{ //tools acquired but no materials
            thread matT(getMaterials,whatToMake);
            matT.join();
        }

        if (recordedLast.compare(last) != 0){

            //return materials and acquire new ones the next run

            continue;
        }
        else {
            makeProduct(whatToMake);
            unique_lock<mutex> locker(toolMutex); 
            tools = tools + 2;
            cout << i << " Operator Product made. Tools returned. Tools now:" << tools << endl;
            productMade = true;
            if (tools >=2)  toolsCV.notify_one();
        }

    //done processing

    }


}   

}

makeProducts:

void makeProduct(int i){
    unique_lock<mutex> mainMatLock(matSearchMutex); 
    // make product according to i
    this_thread::sleep_for(chrono::milliseconds(rand() % 1000 + 10));   
}

getTools:

void getTools(int i){
    unique_lock<mutex> locker(toolMutex); 
    if (tools <2){
        cout << i << " Waiting on tools..." << endl;
        toolsCV.wait(locker);}
    tools = tools - 2;//tools acquired
    cout << i <<" Tools taken. Remaining: " << tools << endl;

}

Thanks to those who have replied. I'll try to implement a waiting queue tonight using multiple condition variables.

(P.S. Is there some better way to do code formatting here on Stack Overflow? Other than the four spaces...

有帮助吗?

解决方案

std::condition_variable does not specify which waiting thread is woken when you call notify_one. You should therefore write code that doesn't care which thread is woken. The standard pattern is that whichever thread is woken, that thread should do the work that needs to be done.

If you require that the threads are woken in a specific order, then use a different mechanism. You could, for example, have a separate std::condition_variable for each thread, and then put the threads in a queue when they need tools. As a thread hands in the tools, it could then signal the condition variable corresponding to the thread at the front of the queue. That thread will then be woken, and the others will remain sleeping (modulo spurious wake-ups).

其他提示

When there are several threads waiting on a condition, the order in which they are awakened (notify_all) or which one is awakened (notify_one) is unspecified. If you need some sort of ordering, you need to use notify_all, and implement it yourself. You can keep a queue of the threads waiting: before waiting (but after acquiring the mutex), push the thread id onto the end of the queue. In the loop, loop on "this thread at front of queue and necessary tools available". When you get the tools, remove the id from the front of the queue, and call notify_all again.

One approach might be to use a fully fledged Semaphore that is shared between the threads instead of a condition variable. That way, you can wait for a specific count.

#include <mutex>
#include <thread>
#include <condition_variable>

using std::mutex;
using std::condition_variable;

class Semaphore
{
public:
    /**
     * Construct a counting semaphore with an initial value
     * @param cnt The value of the initial semaphore count
     */
    Semaphore(unsigned int cnt);

    /**
     * acquire a semaphore count
     * @param numRes The number of count ressources to acquire
     */
    void acquire(unsigned int numRes = 1);

    /**
     * try to acquire a semaphore count.
     * @param numRes The number of count ressources that the method tries to acquire
     * @return true, if numRes could be aquired
     *         false, otherwise
     */
    bool tryAcquire(unsigned int numRes = 1);

    /**
     * release one semaphore cnt
     * @param numRes The number of count ressources to release
     */
    void release(unsigned int numRes = 1);

private:
    unsigned int cnt;
    mutex mut;
    condition_variable cond;
};

Implementation looks like:

void Semaphore::acquire(unsigned int numRes)
{
    unique_lock<mutex> lock(mut);
    while (cnt < numRes)
    {
        cond.wait(lock);
    }

    cnt-=numRes;
}

bool Semaphore::tryAcquire(unsigned int numRes)
{
    unique_lock<mutex> lock(mut);
    if (cnt>=numRes)
    {
        cnt -= numRes;
        return true;
    }
    return false;
}

void Semaphore::release(unsigned int numRes)
{
    {
        unique_lock<mutex> lock(mut);
        cnt += numRes;
    }
    // notify <numRes> waiting entities
    for (unsigned int i = 0; i<numRes; ++i)
    {
        cond.notify_one(); 
    }
}

The real issue here is that if you have worker threads and limited amount of needed resources, you should not care which threads actually is activated, you should only care that the work gets done. The only difference here is in the logging. The number of threads that you defined is the number of threads that can run in parallel, which is limited by the resources to one.

If this is not okay for you, then you need to take actions yourself as explained in other answers.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top