Domanda

I want to be sure I understood how condition variables works, so I will use a program I wrote to ask my question.

In my program I have A "producer" thread (one) and "worker threads" (several let us assume 3).

The producer thread "handles" a FIFO linked list, meaning ,what it does, is simply checking if there is an item (called in my program request of type Req) at the beginning of the list (which is pointed by a global pointer called front in my program) and if so assigning it into a global request element (called globalReq).

The worker threads ,runs in a loop, waiting for requests to be handled, by extracting the global request variable , into a local variable of their own (which is "private" for each one of them cause each thread has an independent stack - correct me if I'm wrong), and then handle the request.

In order to do this I use a mutex along with a condition variable.

An important note would be, that once a request exists (for the moment let us assume only one exists), IT DOES NOT MATTER which one of the worker threads will "take care" off it (assuming they are all "free" - sleeping on the condition variable).

After a request is being extracted and assigned into the global request, the producer thread calls pthread_cond_signal - which, as far as I know , unblocks AT LEAST one "blocked" thread --> therefor It could unblock, for instance 2 threads.

So my questions are with the current code I have (below):

1) How can I assure only one thread (from the worker threads) will take care of the request. Do I need to add a "while check loop" as in all generic "producer consumer" implementation ?

2) How does the threads which are unblocked via the pthread_cond_broadcast (or if a pthread_cond_signal unblocked more than one thread), contents over the mutex, probably I have not grasped it yet...

The code of (each one) the worker thread is:

void *worker(void *arg)
{

       while(1)
       {
         printf("\n BEFORE LOCKING sthread_mutex  with thread: %d \n", syscall(SYS_gettid));
                 pthread_mutex_lock(&sthread_mutex);
         printf("\n AFTER UNLOCKING sthread_mutex  with thread: %d \n", syscall(SYS_gettid));

         printf("\n BEFORE WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));           
                 pthread_cond_wait(&cond_var,&sthread_mutex); //wait on condition variable
         printf("\n AFTER WAITING ON cond_var with thread: %d \n", syscall(SYS_gettid));            

                 printf("\n got signal for thread: %d \n",syscall(SYS_gettid));

         // extract the current request into g local variable 
             // within the "private stack" of this thread   
                 Req localReq = globalReq;  
                 pthread_mutex_unlock(&sthread_mutex);
         printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));

            // perform the desired task
                   task(localReq);
           printf("\n BEFORE calling sem_post with thread: %d \n", syscall(SYS_gettid));
           sem_post(&sem);

       } // end while (1)

} // end of worker thread function

The code of the producer thread is:

void *producer(void *arg)
{
       while(1)
       {

              if(front != NULL)  // queue not empty  
              {
              // go to sleep if all "worker threads" are occuipied 
              // or decrement number of free "worker threads" threads by 1
              printf(" schedualer thread BEFORE calling sem_wait on sem \n");
                          sem_wait(&sem);

              // lock the sthread mutex in order to "synchronize" with the
              // "worker threads"...
              printf(" schedualer thread BEFORE locking sthread_mutex \n");
                          pthread_mutex_lock(&sthread_mutex);
              printf(" schedualer thread AFTER locking sthread_mutex \n");


                          globalReq = extract_element(); // this is the global request variable 

              // notify the worker threads that an "arriving request" needed to 
              // be taking care of
              printf(" schedualer thread BEFORE calling signal on cond_var \n");
                          // pthread_cond_signal(&cond_var);
              pthread_cond_broadcast(&cond_var);
              printf(" schedualer thread AFTER calling signal on cond_var \n");

              // unlock the smutex
              printf(" schedualer thread BEFORE UNLOCKING sthread_mutex \n");
                          pthread_mutex_unlock(&sthread_mutex);
              printf(" schedualer thread AFTER UNLOCKING sthread_mutex \n");

               }  // queue not empty  

               else
                    continue;


        }  // end while (1)

 } // end of producer

Another issue:

The producer thread calls sem_wait on the global semaphore (which is initialized at the beginning with the number of worker threads, in this case 3) in order to indicate for itself how many worker threads are handling a request at the moment , and , to complete this "mechanism", the worker threads , once done handling the request they "won" (when contending over the condition variable), calls sem_post to indicate "another worker thread is available"

3) Is this a proper (good / efficient) way to implement this kind of "signaling how many available worker threads" there are ?

4) What are the benefits & disadvantages of "passing" the request via the global variable shared among the producer and the worker threads mentioned in the //* paragraph ? Is it a wise way to pass it , or it would be better to "just" create a "new request variable" (on the heap using malloc) which will be "dedicated" for each worker thread & request (and also free it within each worker thread once it is done serving the request)?

5) Feel free to indicate me with any other comments (good or bad) you may think of about this piece of code.

EDIT:

Hey there everyone,

Some extra issues:

In addition to the producer and the worker threads, there is ANOTHER THREAD, called listener, which IT'S ONLY TASK is to insert the requests that arrives into the linked list (FIFO queue) ,so it is not really the task of the producer mentioned before.

So my new questions are:

8) With that additional information about my program, again, is the "signaling mechanism" I composed with the semaphore is efficient ?

9) The linked list managed by the producer and listener thread has two global pointers front and rear pointing to the head and tail of the linked list respectively (the head of the list is the first request to be handled).

Below are the implementation of the insertion function performed by the listener thread, and the "extraction" function performed by the producer thread.

In order to synchronize these two thread over the "queue" (the linked list) I used a mutex shared between them called qmutex.

My question is, regarding the 2 codes below , where is the "best" place to "put" the mutex (lock and unlock it) in each one of the functions?

Thanks allot,

Guy.

The insertion function:

void insertion(void *toInsert)
{

    struct getInfo *req = (struct getInfo *)toInsert;
        newNode = (N*)malloc(sizeof(N));
        newNode->req = req;
        newNode->next = NULL;

    // WHERE SHULD I LOCK (AND UNLOCK) THE  QUEUE MUTEX ????????????????????????
        if(front == NULL)
    {
        front = newNode;
        printf("empty list - insert as head \n");     
    }

        else
    {
        rear->next = newNode;
        printf(" NOT AN EMPTY list - insert as last node \n");
    }

        rear = newNode;
}  // end of insertion

The extraction function:

Req extract_element()
{

        if(front == NULL)
            printf("\n empty queue \n");
        else
        {
                Req ret;
                tmpExtract = front;
            ret.socketNum = tmpExtract->req->socketNum;
            ret.type = tmpExtract->req->type;
                printf("\n extracting node with sockNum: %d \n",ret.socketNum);
                front = front->next;
                free(tmpExtract);
                return(ret);
        }
} // end of extract_element
È stato utile?

Soluzione

Instead of answering your questions directly, first, here's a description of the typical way to do this:

You have a sort of queue or list, where you add your work data on to. Whenever you add a set of work data, you first lock a mutex, add the data, signal your condition variable, then unlock the mutex.

Your worker threads then lock the mutex, and wait for the condition in a loop, while the queue is empty. When the signal is sent, one or more workers will wake up, but only one (at a time) will grab the mutex. With the mutex locked, the "winner" checks if something is in the queue, extracts it, unlocks the mutex, and does the necessary work. After unlocking the mutex other threads may wake up also (and will, if the condition was broadcast), and will either extract the next piece of work from the queue, or go back to waiting if the queue is empty.

In code, it looks a bit like this:

#include <pthread.h>
#include <unistd.h>
#include <stdio.h>

#define WORKER_COUNT 3

pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t workers[WORKER_COUNT];

static int queueSize = 0;

static void *workerFunc(void *arg)
{
    printf("Starting worker %d\n", (int)arg);
    while(1) {
        pthread_mutex_lock(&mutex);
        while(queueSize < 1) {
            pthread_cond_wait(&cond, &mutex);
        }
        printf("Worker %d woke up, processing queue #%d\n", (int)arg, queueSize);
        //Extract work from queue
        --queueSize;
        pthread_mutex_unlock(&mutex);
        //Do work
        sleep(1);
    }
}

int main()
{
    int i;

    pthread_mutex_init(&mutex, 0);
    pthread_cond_init(&cond, 0);

    for(i=0; i<WORKER_COUNT; ++i) {
        pthread_create(&(workers[i]), 0, workerFunc, (void*)(i+1));
    }

    sleep(1);
    pthread_mutex_lock(&mutex);
    //Add work to queue
    queueSize = 5;
    pthread_cond_broadcast(&cond);
    pthread_mutex_unlock(&mutex);

    sleep(10);

    return 0;
}

(I've left out cleaning up after the threads, and the passing of the worker number to the thread is quick and dirty, but works in this case).

Here, the workers will be woken up by the pthread_cond_broadcast(), and will run as long as there's something in the queue (until queueSize is back 0 - imagine that there's an actual queue also), then go back to waiting.

Back to the questions:

1: The mutex and the guard variable (here it's queueSize) takes care of this. You also need the guard variable as your thread may be woken up due to other causes also (so-called spurious wakeups, see http://linux.die.net/man/3/pthread_cond_wait).

2: The woken threads contend over the mutex just as any other threads would do, if you call pthread_mutex_lock().

3: I'm not sure why you'd need to signal the amount of available worker threads back to the producer?

4: The queue needs to be accessible from both your producer and consumer - but can still be encapsulated with functions (or classes if you're using C++) in various ways.

5: I hope the above is enough?

6: The thing with pthread_cond_wait() is that it can have spurious wakeups. That is, it might wake up even though you did not signal the condition. You therefore need a guard variable (the while() loop around the pthread_cond_wait() in my code example), to make sure that there actually is a reason to wake up, once pthread_cond_wait() returns. You then protect the guard variable (and whatever work data you need to extract) with the same mutex as the condition uses, and then you can be certain that only one thread will do each piece of work.

7: Instead of having the producer go to sleep, I'd just let it add whatever data it can extract to the workqueue. If the queue is full, then it should go to sleep, otherwise it should just keep on adding stuff.

8: With your Listener thread, I can't really see why you even need your Producer thread. Why not let the Workers call extract_element() themselves?

9: You need to protect all accesses to the list variables. That is, in insertion(), lock the mutex just before you first access front, and unlock it after your last access of rear. Same thing in extract_element() - although you'll need to rewrite the function to also have a valid return value when the queue is empty.

Altri suggerimenti

Want to be certain about the previous issue and another thing , so the new questions are:

  1. If I still want to "stick" with my implementation of the threads, meaning use the mutex and the condition variable as I wrote , how could I be certain that when the producer thread calls pthread_cond_signal ONLY one thread will continue to run (from the instruction right after the pthread_cond_wait) ?
    Do I need to add another check or use another variable , or (as I prefer) just use the generic mechanism of pthread_cond_wait/signal?

NOTE: I used pthread_cond_broadcast in order "to simulate" a situation where pthread_cond_signal unblocks more than one thread.

  1. To clarify my "logic" of the program:

The reason the producer thread decrements the semaphore whenever it assigns a request , and in contrast , the worker threads increments it's value , is to make the producer thread "go to sleep" on the semaphore if ALL THE WORKER THREADS are busy --> meaning "wait with the extracting and assigning procedure" until (at least) one of the worker threads is available to handle the request. Is this a good implementation of what I am trying to achieve , or is there a better way of doing it?

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top