Domanda

I would like to have to possibility to make thread (consumer) express interest in when another thread (producer) makes something. But not all the time.

Basically I want to make a one-shot consumer. Ideally the producer through would go merrily about its business until one (or many) consumers signal that they want something, in which case the producer would push some data into a variable and signal that it has done so. The consumer will wait until the variable has become filled.

It must also be so that the one-shot consumer can decide that it has waited too long and abandon the wait (a la pthread_cond_timedwait)

I've been reading many articles and SO questions about different ways to synchronize threads. Currently I'm leaning towards a condition variable approach.

I would like to know if this is a good way to go about it (being a novice at thread programming I probably have quite a few bugs in there), or if it perhaps would be better to (ab)use semaphores for this situation? Or something else entirely? Just an atomic assign to a pointer variable if available? I currently don't see how these would work safely, probably because I'm trying to stay on the safe side, this application is supposed to run for months, without locking up. Can I do without the mutexes in the producer? i.e.: just signal a condition variable?

My current code looks like this:

consumer {
   pthread_mutex_lock(m);

   pred = true; /* signal interest */

   while (pred) {
       /* wait a bit and hopefully get an answer before timing out */
       pthread_cond_timedwait(c, m, t);

       /* it is possible that the producer never produces anything, in which
          case the pred will stay true, we must "designal" interest here,
          unfortunately the also means that a spurious wake could make us miss
          a good answer, no? How to combat this? */
       pred = false;
   }

   /* if we got here that means either an answer is available or we timed out */
   //... (do things with answer if not timed out, otherwise assign default answer)

   pthread_mutex_unlock(m);
}

/* this thread is always producing, but it doesn't always have listeners */
producer {
   pthread_mutex_lock(m);

   /* if we have a listener */
   if (pred) {
      buffer = "work!";

      pred = false;

      pthread_cond_signal(c);
   }

   pthread_mutex_unlock(m);
}

NOTE: I'm on a modern linux and can make use of platform-specific functionality if necessary NOTE2: I used the seemingly global variables m, c, and t. But these would be different for every consumer.

High-level recap

I want a thread to be able to register for an event, wait for it for a specified time and then carry on. Ideally it should be possible for more than one thread to register at the same time and all threads should get the same events (all events that came in the timespan).

È stato utile?

Soluzione

What you want is something similar to a std::future in c++ (doc). A consumer requests a task to be performed by a producer using a specific function. That function creates a struct called future (or promise), holding a mutex, a condition variable associated with the task as well as a void pointer for the result, and returns it to the caller. It also put that struct, the task id, and the parameters (if any) in a work queue handled by the producer.

struct future_s {
    pthread_mutex_t m;
    pthread_cond_t c;
    int flag;
    void *result;
};

// basic task outline
struct task_s {
    struct future_s result;
    int taskid;
};

// specific "mytask" task
struct mytask_s {
    struct future_s result;
    int taskid;
    int p1;
    float p2;
};

future_s *do_mytask(int p1, float p2){
     // allocate task data
     struct  mytask_s * t = alloc_task(sizeof(struct mytask_s));
     t->p1 = p1;
     t->p2 = p2;
     t->taskid = MYTASK_ID;
     task_queue_add(t);
    return (struct future_s *)t;
}

Then the producer pull the task out of the queue, process it, and once terminated, put the result in the future and trigger the variable.

The consumer may wait for the future or do something else.

For a cancellable futures, include a flag in the struct to indicate that the task is cancelled. The future is then either:

  • delivered, the consumer is the owner and must deallocate it
  • cancelled, the producer remains the owner and disposes of it.

The producer must therefore check that the future has not been cancelled before triggering the condition variable.

For a "shared" future, the flag turns into a number of subscribers. If the number is above zero, the order must be delivered. The consumer owning the result is left to be decided between all consumers (First come first served? Is the result passed along to all consumers?).

Any access to the future struct must be mutexed (which works well with the condition variable).

Regarding the queues, they may be implemented using a linked list or an array (for versions with limited capacity). Since the functions creating the futures may be called concurrently, they have to be protected with a lock, which is usually implemented with a mutex.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top