Domanda

Vorrei implementare uno scenario produttore / consumatore che obbedisce a interfacce che sono approssimativamente:

class Consumer {
private:
    vector<char> read(size_t n) {
        // If the internal buffer has `n` elements, then dequeue them
        // Otherwise wait for more data and try again
    }
public:
    void run() {
        read(10);
        read(4839);
        // etc
    }
    void feed(const vector<char> &more) {
        // Safely queue the data
        // Notify `read` that there is now more data
    }
};

In questo caso, feed e esegui verranno eseguiti su thread separati e read dovrebbe essere una lettura bloccante (come recv e fread ). Ovviamente, avrò bisogno di una sorta di mutua esclusione sulla mia deque e avrò bisogno di una specie di sistema di notifica per informare read per riprovare.

Sento le variabili di condizione sono la strada da percorrere, ma tutta la mia esperienza di multithreading risiede con Windows e sto facendo fatica ad avvolgermi intorno.

Grazie per l'aiuto!

(Sì, lo so che è inefficiente restituire i vettori. Non entriamo in questo.)

È stato utile?

Soluzione

Questo codice non è pronto per la produzione. Non viene eseguito alcun controllo degli errori sui risultati di eventuali chiamate in biblioteca.

Ho bloccato il blocco / sblocco del mutex in LockThread, quindi è eccezionalmente sicuro. Ma questo è tutto.

Inoltre, se lo facessi sul serio, avvolgerei il mutex e condizionerei le variabili all'interno degli oggetti in modo che possano essere maltrattate all'interno di altri metodi di consumo. Ma fintanto che prendi nota che il blocco deve essere acquisito prima di utilizzare la variabile condizione (in alcun modo), questa semplice situazione può rimanere così com'è.

Per interesse hai controllato la libreria di thread di boost?

#include <iostream>
#include <vector>
#include <pthread.h>

class LockThread
{
    public:
    LockThread(pthread_mutex_t& m)
        :mutex(m)
    {
        pthread_mutex_lock(&mutex);
    }
    ~LockThread()
    {
        pthread_mutex_unlock(&mutex);
    }
    private:
        pthread_mutex_t& mutex;
};
class Consumer
{
    pthread_mutex_t     lock;
    pthread_cond_t      cond;
    std::vector<char>   unreadData;
    public:
    Consumer()
    {
        pthread_mutex_init(&lock,NULL);
        pthread_cond_init(&cond,NULL);
    }
    ~Consumer()
    {
        pthread_cond_destroy(&cond);
        pthread_mutex_destroy(&lock);
    }

    private:
        std::vector<char> read(size_t n)
        {
            LockThread  locker(lock);
            while (unreadData.size() < n)
            {
                // Must wait until we have n char.
                // This is a while loop because feed may not put enough in.

                // pthread_cond() releases the lock.
                // Thread will not be allowed to continue until
                // signal is called and this thread reacquires the lock.

                pthread_cond_wait(&cond,&lock);

                // Once released from the condition you will have re-aquired the lock.
                // Thus feed() must have exited and released the lock first.
            }

            /*
             * Not sure if this is exactly what you wanted.
             * But the data is copied out of the thread safe buffer
             * into something that can be returned.
             */
            std::vector<char>   result(n); // init result with size n
            std::copy(&unreadData[0],
                      &unreadData[n],
                      &result[0]);

            unreadData.erase(unreadData.begin(),
                             unreadData.begin() + n);
            return (result);
        }
public:
    void run()
    {
        read(10);
        read(4839);
        // etc
    }
    void feed(const std::vector<char> &more)
    {
        LockThread  locker(lock);

        // Once we acquire the lock we can safely modify the buffer.
        std::copy(more.begin(),more.end(),std::back_inserter(unreadData));

        // Only signal the thread if you have the lock
        // Otherwise race conditions happen.
        pthread_cond_signal(&cond);

        // destructor releases the lock and thus allows read thread to continue.
    }
};


int main()
{
    Consumer    c;
}

Altri suggerimenti

Tendo a usare quella che chiamo "Coda sincronizzata". Avvolgo la normale coda e utilizzo una classe Semaphore sia per bloccare che per rendere il blocco di lettura come desideri:

#ifndef SYNCQUEUE_20061005_H_
#define SYNCQUEUE_20061005_H_

#include <queue>
#include "Semaphore.h"

// similar, but slightly simpler interface to std::queue
// this queue implementation will serialize pushes and pops
// and block on a pop while empty (as apposed to throwing an exception)
// it also locks as neccessary on insertion and removal to avoid race 
// conditions

template <class T, class C = std::deque<T> > class SyncQueue {
protected:
    std::queue<T, C>    m_Queue;
    Semaphore           m_Semaphore;
    Mutex               m_Mutex;

public:
    typedef typename std::queue<T, C>::value_type value_type;
    typedef typename std::queue<T, C>::size_type size_type;

    explicit SyncQueue(const C& a = C()) : m_Queue(a), m_Semaphore(0) {}

    bool empty() const              { return m_Queue.empty(); }
    size_type size() const          { return m_Queue.size(); }

    void push(const value_type& x);
    value_type pop();
};

template <class T, class C>
void SyncQueue<T, C>::push(const SyncQueue<T, C>::value_type &x) {
    // atomically push item
    m_Mutex.lock(); 
    m_Queue.push(x); 
    m_Mutex.unlock(); 

    // let blocking semaphore know another item has arrived
    m_Semaphore.v();
}

template <class T, class C>
typename SyncQueue<T, C>::value_type SyncQueue<T, C>::pop() {
    // block until we have at least one item
    m_Semaphore.p();

    // atomically read and pop front item
    m_Mutex.lock();
    value_type ret = m_Queue.front();
    m_Queue.pop();
    m_Mutex.unlock();

    return ret;
}

#endif

Puoi implementare semafori e mutex con le primitive appropriate nella tua implementazione di threading.

NOTA: questa implementazione è un esempio per singoli elementi in una coda, ma potresti facilmente avvolgerlo con una funzione che buffer i risultati fino a quando non viene fornito N. qualcosa del genere se si tratta di una coda di caratteri:

std::vector<char> func(int size) {
    std::vector<char> result;
    while(result.size() != size) {
        result.push_back(my_sync_queue.pop());
    }
    return result;
}

Lancerò un po 'di pseudo-codice. Ecco i miei commenti:

1) Grani molto grandi di bloccaggio qui. Se hai bisogno di un accesso più veloce, vorrai ripensare le tue strutture dati. STL non è thread-safe.

2) Il blocco si bloccherà fino a quando il mutex non lo farà passare. La struttura mutex è che consente 1 passaggio attraverso di essa alla volta con il meccanismo di blocco / sblocco. Non è necessario eseguire il polling o una struttura di tipo eccezionale.

3) Questo è un problema piuttosto sintatticamente confuso al problema. Non sono preciso con la sintassi API né C ++, ma credo che fornisca una soluzione semanticamente corretta.

4) Modificato in risposta al commento.

class piper
{
pthread_mutex queuemutex;
pthread_mutex readymutex;
bool isReady; //init to false by constructor

//whatever else
};

piper::read()
{//whatever
pthread_mutex_lock(&queuemutex)
if(myqueue.size() >= n)
{ 
   return_queue_vector.push_back(/* you know what to do here */)

    pthread_mutex_lock(&readymutex)
    isReady = false;
    pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}

piper::push_em_in()
{
//more whatever
pthread_mutex_lock(&queuemutex)
//push push push
if(myqueue.size() >= n)
{
    pthread_mutex_lock(&readymutex)
    isReady = true;
    pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}

Solo per divertimento, ecco un'implementazione veloce e sporca con Boost. Utilizza pthreads sotto il cofano su piattaforme che lo supportano e su Windows utilizza le operazioni di Windows.

boost::mutex access;
boost::condition cond;

// consumer
data read()
{
  boost::mutex::scoped_lock lock(access);
  // this blocks until the data is ready
  cond.wait(lock);

  // queue is ready
  return data_from_queue();
}

// producer
void push(data)
{
  boost::mutex::scoped_lock lock(access);
  // add data to queue

  if (queue_has_enough_data())
    cond.notify_one();  
}

Per ancora più divertimento, ecco la mia versione finale. STL-ized senza una buona ragione. : -)

#include <algorithm>
#include <deque>
#include <pthread.h>

template<typename T>
class MultithreadedReader {
    std::deque<T>   buffer;
    pthread_mutex_t moreDataMutex;
    pthread_cond_t  moreDataCond;

protected:
    template<typename OutputIterator>
    void read(size_t count, OutputIterator result) {
        pthread_mutex_lock(&moreDataMutex);

        while (buffer.size() < count) {
            pthread_cond_wait(&moreDataCond, &moreDataMutex);
        }
        std::copy(buffer.begin(), buffer.begin() + count, result);
        buffer.erase(buffer.begin(), buffer.begin() + count);

        pthread_mutex_unlock(&moreDataMutex);
    }

public:
    MultithreadedReader() {
        pthread_mutex_init(&moreDataMutex, 0);
        pthread_cond_init(&moreDataCond, 0);
    }

    ~MultithreadedReader() {
        pthread_cond_destroy(&moreDataCond);
        pthread_mutex_destroy(&moreDataMutex);
    }

    template<typename InputIterator>
    void feed(InputIterator first, InputIterator last) {
        pthread_mutex_lock(&moreDataMutex);

        buffer.insert(buffer.end(), first, last);
        pthread_cond_signal(&moreDataCond);

        pthread_mutex_unlock(&moreDataMutex);
    }
};

Le code asincrone di Glib forniscono il blocco e la sospensione durante la lettura di una coda vuota che stai cercando. Vedi http://library.gnome.org/devel/glib /2.20/glib-Asynchronous-Queues.html Puoi combinarli con gthreads o gthread pool.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top