質問

I'm working on a couple of C++11 work queue classes. The first class, command_queue is a multi producer single consumer work queue. Multiple threads can post commands, and a single thread calls "wait()" and "pop_back()" in a loop to process those commands.

The second class, Actor uses command_queue and actually provides a consumer thread... besides that, the idea is that post() will return a future so that clients can either block until the command is processed, or continue running (actor also adds the idea of a result type). To implement this, I'm attempting to store std::promise's in a std::pair in the work queue. I believe I am fairly close, but i'm having a problem in the _entry_point function below... specifically, when I'm trying to get the std::pair out of the command queue I'm getting a "use of deleted function" compiler error... I'll put the actual error i'm getting from the compiler below the code (you should be able to save this to a text file and compile it yourself, it's stand alone c++11 code).

#include <mutex>
#include <condition_variable>
#include <future>
#include <list>
#include <stdio.h>

template<class T>
class command_queue
{

public:
    command_queue() = default;
    command_queue( const command_queue& ) = delete;
    virtual ~command_queue() noexcept = default;

    command_queue& operator = ( const command_queue& ) = delete;

    void start()
    {
        std::unique_lock<std::recursive_mutex> g( _queueLock );
        _started = true;
    }

    bool started()
    {
        return _started;
    }

    void stop()
    {
        std::unique_lock<std::recursive_mutex> g( _queueLock );
        _started = false;
        _queueCond.notify_one();
    }

    void post_front( const T& cmd )
    {
        std::unique_lock<std::recursive_mutex> g( _queueLock );
        _queue.push_front( cmd );
        _queueCond.notify_one();
    }

    void post_front( T&& cmd )
    {
        std::unique_lock<std::recursive_mutex> g( _queueLock );
        _queue.push_front( cmd );
        _queueCond.notify_one();
    }

    void wait()
    {
        std::unique_lock<std::recursive_mutex> g( _queueLock );
        _queueCond.wait( g, [this](){return !this->_queue.empty() ? true : !this->_started;});
    }

    T pop_back()
    {
        std::unique_lock<std::recursive_mutex> g( _queueLock );
        auto val = _queue.back();
        _queue.pop_back();
        return val;
    }

private:
    std::recursive_mutex _queueLock;
    std::condition_variable_any _queueCond;
    std::list<T> _queue;
    bool _started = false;
};

template<class T, class U>
class actor
{
public:
    actor() :
        _started( false ),
        _thread(),
        _queue()
    {
    }

    actor( const actor& ) = delete;

    virtual ~actor() noexcept
    {
        if( _started )
            stop();
    }

    actor& operator = ( const actor& ) = delete;

    void start()
    {
        _started = true;

        _queue.start();

        _thread = std::thread( &actor<T,U>::_entry_point, this );
    }

    void stop()
    {
        _started = false;

        _queue.stop();

        _thread.join();
    }

    std::future<U> post( const T& cmd )
    {
        std::promise<U> p;
        std::future<U> waiter = p.get_future();

        _queue.post_front( std::pair<T,std::promise<U>>(cmd, std::move(p)) );

        return waiter;
    }

    virtual U process( const T& cmd ) = 0;

protected:
    void _entry_point()
    {
        while( _started )
        {
            _queue.wait();

            if( !_started )
                continue;

            std::pair<T,std::promise<U>> item = _queue.pop_back();

            item.second.set_value( process( item.first ) );
        }
    }

    bool _started;
    std::thread _thread;
    command_queue<std::pair<T,std::promise<U>>> _queue;
};

class int_printer : public actor<int,bool>
{
public:
    virtual bool process( const int& cmd ) override
    {
        printf("%d",cmd);
        return true;
    }
};

using namespace std;

int main( int argc, char* argv[] )
{
//    std::promise<bool> p;
//    list<std::pair<int,std::promise<bool>>> promises;
//    promises.push_back( make_pair<int,std::promise<bool>>(10,std::move(p)) );

    int_printer a;
    a.start();
    future<bool> result = a.post( 10 );
    a.stop();
}

[developer@0800275b874e projects]$ g++ -std=c++11 pf.cpp -opf -lpthread
pf.cpp: In instantiation of ‘T command_queue<T>::pop_back() [with T = std::pair<int, std::promise<bool> >]’:
pf.cpp:133:65:   required from ‘void actor<T, U>::_entry_point() [with T = int; U = bool]’
pf.cpp:99:9:   required from ‘void actor<T, U>::start() [with T = int; U = bool]’
pf.cpp:163:13:   required from here
pf.cpp:60:32: error: use of deleted function ‘constexpr std::pair<_T1, _T2>::pair(const std::pair<_T1, _T2>&) [with _T1 = int; _T2 = std::promise<bool>]’
In file included from /usr/lib/gcc/x86_64-redhat-linux/4.7.0/../../../../include/c++/4.7.0/utility:72:0,
                 from /usr/lib/gcc/x86_64-redhat-linux/4.7.0/../../../../include/c++/4.7.0/tuple:38,
                 from /usr/lib/gcc/x86_64-redhat-linux/4.7.0/../../../../include/c++/4.7.0/mutex:39,
                 from pf.cpp:2:
/usr/lib/gcc/x86_64-redhat-linux/4.7.0/../../../../include/c++/4.7.0/bits/stl_pair.h:119:17: note: ‘constexpr std::pair<_T1, _T2>::pair(const std::pair<_T1, _T2>&) [with _T1 = int; _T2 = std::promise<bool>]’ is implicitly deleted because the default definition would be ill-formed:
/usr/lib/gcc/x86_64-redhat-linux/4.7.0/../../../../include/c++/4.7.0/bits/stl_pair.h:119:17: error: use of deleted function ‘std::promise<_Res>::promise(const std::promise<_Res>&) [with _Res = bool]’
In file included from pf.cpp:4:0:
/usr/lib/gcc/x86_64-redhat-linux/4.7.0/../../../../include/c++/4.7.0/future:963:7: error: declared here
In file included from /usr/lib/gcc/x86_64-redhat-linux/4.7.0/../../../../include/c++/4.7.0/list:64:0,
                 from pf.cpp:5:
役に立ちましたか?

解決

Promises aren't copyable (which makes sense - they represent a unique state). You need to use std::move in several places to transfer the unique ownership of the promise along.

Specifically, your home-brew queue class needs to permit moving, e.g.

auto val = std::move(_queue.back());
_queue.pop_back();
return val;

他のヒント

You protect writes to command_queue::_started with _queueLock, but not the read in command_queue::started(); this is a data race if some thread can call started while another thread is performing a modification (e.g., stop()).

Several small observations:

  • It doesn't make your program incorrect, but it's better to notify a condition variable outside the mutex. If you notify with the mutex held, another core may waste a microsecond or two scheduling a waiting thread to run only to immediately block on the mutex.

  • Your post_front(T&&) is copying the passed item into the queue due to missing a std::move:

    _queue.push_front( cmd );
    

    must be

    _queue.push_front( std::move( cmd ) );
    

    if you want it to actually be moved into the queue.

  • The predicate for the condition variable wait could be simplified from

    [this](){return !this->_queue.empty() ? true : !this->_started;}
    

    to

    [this]{return !_queue.empty() || !_started;}
    
  • None of the command_queue member functions call other command_queue functions, so you could use a plain std::mutex instead of std::recursive_mutex and std::condition_variable instead of std::condition_variable_any.

  • You could use std::lock_guard<std::mutex> instead of std::unique_lock<std::mutex> to lock the mutex in every member function except wait. It's ever-so-slightly lighter weight.

  • You have the traditional pop exception-safety issue: If the selected move/copy constructor for T fails with an exception when returning from pop_back after modifying the queue, that element is lost. The way you've written the function makes this occurrence extremely unlikely, since

    auto val = _queue.back();
    _queue.pop_back();
    return val;
    

    (or after Kerrek's fix)

    auto val = std::move(_queue.back());
    _queue.pop_back();
    return val;
    

    should qualify for copy elision with a decent compiler, constructing the returned object in-place before the pop_back happens. Just be aware that if future changes impede copy elision you'll introduce the exception safety problem. You can avoid the issue altogether by passing a T& or optional<T>& as a parameter and move assigning the result to that parameter.

  • actor::_started is unnecessary since it's effectively a proxy for actor::_queue::_started.

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top