Pregunta

Me gustaría implementar un escenario de productor / consumidor que obedezca a interfaces que son aproximadamente:

class Consumer {
private:
    vector<char> read(size_t n) {
        // If the internal buffer has `n` elements, then dequeue them
        // Otherwise wait for more data and try again
    }
public:
    void run() {
        read(10);
        read(4839);
        // etc
    }
    void feed(const vector<char> &more) {
        // Safely queue the data
        // Notify `read` that there is now more data
    }
};

En este caso, feed y run se ejecutarán en subprocesos separados y read debería ser una lectura de bloqueo (como recv y fread ). Obviamente, necesitaré algún tipo de exclusión mutua en mi deque, y necesitaré algún tipo de sistema de notificación para informar a leer para volver a intentarlo.

He escuchado que las variables de condición son el camino a seguir, pero toda mi experiencia en subprocesos múltiples reside en Windows y me está costando mucho envolver mi cabeza alrededor de ellas.

¡Gracias por cualquier ayuda!

(Sí, sé que es ineficiente devolver los vectores. No entremos en eso).

¿Fue útil?

Solución

Este código no está listo para producción. No se realiza ninguna comprobación de errores en los resultados de las llamadas a la biblioteca.

Envolví el bloqueo / desbloqueo del mutex en LockThread para que sea una excepción segura. Pero eso es todo.

Además, si lo estuviera haciendo con seriedad, envolvería las variables de exclusión y condición dentro de los objetos para que puedan ser objeto de abuso dentro de otros métodos de Consumer. Pero siempre que tome nota de que el bloqueo debe adquirirse antes de usar la variable de condición (de cualquier manera), esta simple situación puede mantenerse como está.

Fuera de interés, ¿has revisado la biblioteca de mejora de subprocesos?

#include <iostream>
#include <vector>
#include <pthread.h>

class LockThread
{
    public:
    LockThread(pthread_mutex_t& m)
        :mutex(m)
    {
        pthread_mutex_lock(&mutex);
    }
    ~LockThread()
    {
        pthread_mutex_unlock(&mutex);
    }
    private:
        pthread_mutex_t& mutex;
};
class Consumer
{
    pthread_mutex_t     lock;
    pthread_cond_t      cond;
    std::vector<char>   unreadData;
    public:
    Consumer()
    {
        pthread_mutex_init(&lock,NULL);
        pthread_cond_init(&cond,NULL);
    }
    ~Consumer()
    {
        pthread_cond_destroy(&cond);
        pthread_mutex_destroy(&lock);
    }

    private:
        std::vector<char> read(size_t n)
        {
            LockThread  locker(lock);
            while (unreadData.size() < n)
            {
                // Must wait until we have n char.
                // This is a while loop because feed may not put enough in.

                // pthread_cond() releases the lock.
                // Thread will not be allowed to continue until
                // signal is called and this thread reacquires the lock.

                pthread_cond_wait(&cond,&lock);

                // Once released from the condition you will have re-aquired the lock.
                // Thus feed() must have exited and released the lock first.
            }

            /*
             * Not sure if this is exactly what you wanted.
             * But the data is copied out of the thread safe buffer
             * into something that can be returned.
             */
            std::vector<char>   result(n); // init result with size n
            std::copy(&unreadData[0],
                      &unreadData[n],
                      &result[0]);

            unreadData.erase(unreadData.begin(),
                             unreadData.begin() + n);
            return (result);
        }
public:
    void run()
    {
        read(10);
        read(4839);
        // etc
    }
    void feed(const std::vector<char> &more)
    {
        LockThread  locker(lock);

        // Once we acquire the lock we can safely modify the buffer.
        std::copy(more.begin(),more.end(),std::back_inserter(unreadData));

        // Only signal the thread if you have the lock
        // Otherwise race conditions happen.
        pthread_cond_signal(&cond);

        // destructor releases the lock and thus allows read thread to continue.
    }
};


int main()
{
    Consumer    c;
}

Otros consejos

Tiendo a usar lo que llamo una " Cola sincronizada " ;. Envuelvo la cola normal y uso una clase de semáforo para bloquear y hacer el bloque de lectura como desee:

#ifndef SYNCQUEUE_20061005_H_
#define SYNCQUEUE_20061005_H_

#include <queue>
#include "Semaphore.h"

// similar, but slightly simpler interface to std::queue
// this queue implementation will serialize pushes and pops
// and block on a pop while empty (as apposed to throwing an exception)
// it also locks as neccessary on insertion and removal to avoid race 
// conditions

template <class T, class C = std::deque<T> > class SyncQueue {
protected:
    std::queue<T, C>    m_Queue;
    Semaphore           m_Semaphore;
    Mutex               m_Mutex;

public:
    typedef typename std::queue<T, C>::value_type value_type;
    typedef typename std::queue<T, C>::size_type size_type;

    explicit SyncQueue(const C& a = C()) : m_Queue(a), m_Semaphore(0) {}

    bool empty() const              { return m_Queue.empty(); }
    size_type size() const          { return m_Queue.size(); }

    void push(const value_type& x);
    value_type pop();
};

template <class T, class C>
void SyncQueue<T, C>::push(const SyncQueue<T, C>::value_type &x) {
    // atomically push item
    m_Mutex.lock(); 
    m_Queue.push(x); 
    m_Mutex.unlock(); 

    // let blocking semaphore know another item has arrived
    m_Semaphore.v();
}

template <class T, class C>
typename SyncQueue<T, C>::value_type SyncQueue<T, C>::pop() {
    // block until we have at least one item
    m_Semaphore.p();

    // atomically read and pop front item
    m_Mutex.lock();
    value_type ret = m_Queue.front();
    m_Queue.pop();
    m_Mutex.unlock();

    return ret;
}

#endif

Puede implementar semáforos y mutexes con los primitivos adecuados en su implementación de subprocesos.

NOTA: esta implementación es un ejemplo para elementos individuales en una cola, pero podría fácilmente envolver esto con una función que búfere los resultados hasta que se haya proporcionado N. algo como esto si es una cola de caracteres:

std::vector<char> func(int size) {
    std::vector<char> result;
    while(result.size() != size) {
        result.push_back(my_sync_queue.pop());
    }
    return result;
}

Voy a tirar un semipseudo código. Aquí están mis comentarios:

1) Granos muy grandes de bloqueo aquí. Si necesita un acceso más rápido, querrá repensar sus estructuras de datos. El STL no es seguro para subprocesos.

2) El bloqueo se bloqueará hasta que el mutex lo permita. La estructura de exclusión mutua es que permite que 1 hilo lo atraviese a la vez con el mecanismo de bloqueo / desbloqueo. No es necesario realizar una encuesta o algún tipo de estructura excepcional.

3) Este es un corte bastante sintácticamente hacky en el problema. No estoy siendo preciso con la sintaxis de API ni de C ++, pero creo que proporciona una solución semánticamente correcta.

4) Editado en respuesta al comentario.

class piper
{
pthread_mutex queuemutex;
pthread_mutex readymutex;
bool isReady; //init to false by constructor

//whatever else
};

piper::read()
{//whatever
pthread_mutex_lock(&queuemutex)
if(myqueue.size() >= n)
{ 
   return_queue_vector.push_back(/* you know what to do here */)

    pthread_mutex_lock(&readymutex)
    isReady = false;
    pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}

piper::push_em_in()
{
//more whatever
pthread_mutex_lock(&queuemutex)
//push push push
if(myqueue.size() >= n)
{
    pthread_mutex_lock(&readymutex)
    isReady = true;
    pthread_mutex_unlock(&readymutex)
}
pthread_mutex_unlock(&queuemutex)
}

Solo por diversión, aquí hay una implementación rápida y sucia que usa Boost. Utiliza pthreads debajo del capó en plataformas que lo admiten, y en Windows usa las operaciones de Windows.

boost::mutex access;
boost::condition cond;

// consumer
data read()
{
  boost::mutex::scoped_lock lock(access);
  // this blocks until the data is ready
  cond.wait(lock);

  // queue is ready
  return data_from_queue();
}

// producer
void push(data)
{
  boost::mutex::scoped_lock lock(access);
  // add data to queue

  if (queue_has_enough_data())
    cond.notify_one();  
}

Para más diversión, aquí está mi versión final. STL-ized sin ninguna buena razón. :-)

#include <algorithm>
#include <deque>
#include <pthread.h>

template<typename T>
class MultithreadedReader {
    std::deque<T>   buffer;
    pthread_mutex_t moreDataMutex;
    pthread_cond_t  moreDataCond;

protected:
    template<typename OutputIterator>
    void read(size_t count, OutputIterator result) {
        pthread_mutex_lock(&moreDataMutex);

        while (buffer.size() < count) {
            pthread_cond_wait(&moreDataCond, &moreDataMutex);
        }
        std::copy(buffer.begin(), buffer.begin() + count, result);
        buffer.erase(buffer.begin(), buffer.begin() + count);

        pthread_mutex_unlock(&moreDataMutex);
    }

public:
    MultithreadedReader() {
        pthread_mutex_init(&moreDataMutex, 0);
        pthread_cond_init(&moreDataCond, 0);
    }

    ~MultithreadedReader() {
        pthread_cond_destroy(&moreDataCond);
        pthread_mutex_destroy(&moreDataMutex);
    }

    template<typename InputIterator>
    void feed(InputIterator first, InputIterator last) {
        pthread_mutex_lock(&moreDataMutex);

        buffer.insert(buffer.end(), first, last);
        pthread_cond_signal(&moreDataCond);

        pthread_mutex_unlock(&moreDataMutex);
    }
};

Las colas asíncronas de Glib proporcionan el bloqueo y la suspensión al leer una cola vacía que está buscando. Consulte http://library.gnome.org/devel/glib /2.20/glib-Asynchronous-Queues.html Puede combinarlos con gthreads o gthread pools.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top