Question

I have an object that runs around a boost::asio::io_service which has some properties. Something like that:

class Foo
{
  private:

    // Not an int in my real code, but it doesn't really matter.
    int m_bar;
    boost::asio::io_service& m_io_service;
    boost::asio::strand m_bar_strand;
};

m_bar is to be used only from a handler that is called through the strand m_bar_strand. This allows me not to lock from within those handlers.

To set the m_bar property from outside a thread that runs io_service::run() I wrote an asynchronous_setter, like so:

class Foo
{
  public:
    void async_get_bar(function<void (int)> handler)
    {
      m_bar_strand.post(bind(&Foo::do_get_bar, this, handler));
    }

    void async_set_bar(int value, function<void ()> handler)
    {
      m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
    }

  private:

    void do_get_bar(function<void (int)> handler)
    {
      // This is only called from within the m_bar_strand, so we are safe.

      // Run the handler to notify the caller.
      handler(m_bar);
    }

    void do_set_bar(int value, function<void ()> handler)
    {
      // This is only called from within the m_bar_strand, so we are safe.
      m_bar = value;

      // Run the handler to notify the caller.
      handler();
    }

    int m_bar;
    boost::asio::io_service& m_io_service;
    boost::asio::strand m_bar_strand;
};

This works perfectly but now I'd like to write a synchronous version of set_bar that sets the value and returns only when the set was effective. It must still guarantee that the effective set will occur within the m_bar_strand. Ideally, something reentrant.

I can imagine solutions with semaphores that would be modified from within the handler but everything I come up seems hackish and really not elegant. Is there something in Boost/Boost Asio that allows such a thing?

How would you proceed to implement this method?

Was it helpful?

Solution

If you need to synchronously wait on a value to be set, then Boost.Thread's futures may provide an elegant solution:

The futures library provides a means of handling synchronous future values, whether those values are generated by another thread, or on a single thread in response to external stimuli, or on-demand.

In short, a boost::promise is created and allows for a value to be set on it. The value can later be retrieved via an associated boost::future. Here is a basic example:

boost::promise<int> promise;
boost::unique_future<int> future = promise.get_future();

// start asynchronous operation that will invoke future.set_value(42)
...

assert(future.get() == 42); // blocks until future has been set.

Two other notable benefits to this approach:

  • future is part of C++11.
  • Exceptions can even be passed to future via promise::set_exception(), supporting an elegant way to provide exceptions or errors to the caller.

Here is a complete example based on the original code:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class Foo
{
public:

  Foo(boost::asio::io_service& io_service)
    : m_io_service(io_service),
      m_bar_strand(io_service)
  {}

public:

  void async_get_bar(boost::function<void(int)> handler)
  {
    m_bar_strand.post(bind(&Foo::do_get_bar, this, handler));
  }

  void async_set_bar(int value, boost::function<void()> handler)
  {
    m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
  }

  int bar()
  {
    typedef boost::promise<int> promise_type;
    promise_type promise;

    // Pass the handler to async operation that will set the promise.
    void (promise_type::*setter)(const int&) = &promise_type::set_value;
    async_get_bar(boost::bind(setter, &promise, _1));

    // Synchronously wait for promise to be fulfilled.
    return promise.get_future().get();
  }

  void bar(int value)
  {
    typedef boost::promise<void> promise_type;
    promise_type promise;

    // Pass the handler to async operation that will set the promise.
    async_set_bar(value, boost::bind(&promise_type::set_value, &promise));

    // Synchronously wait for the future to finish.
    promise.get_future().wait();
  }

private:

  void do_get_bar(boost::function<void(int)> handler)
  {
    // This is only called from within the m_bar_strand, so we are safe.

    // Run the handler to notify the caller.
    handler(m_bar);
  }

  void do_set_bar(int value, boost::function<void()> handler)
  {
    // This is only called from within the m_bar_strand, so we are safe.
    m_bar = value;

    // Run the handler to notify the caller.
    handler();
  }

  int m_bar;
  boost::asio::io_service& m_io_service;
  boost::asio::strand m_bar_strand;
};

int main()
{
  boost::asio::io_service io_service;
  boost::asio::io_service::work work(io_service);
  boost::thread t(
      boost::bind(&boost::asio::io_service::run, boost::ref(io_service)));

  Foo foo(io_service);
  foo.bar(21);
  std::cout << "foo.bar is " << foo.bar() << std::endl;
  foo.bar(2 * foo.bar());
  std::cout << "foo.bar is " << foo.bar() << std::endl;

  io_service.stop();
  t.join();
}

which provides the following output:

foo.bar is 21
foo.bar is 42

OTHER TIPS

You could use a pipe to notify the synchronous method when the value is set in async_set_bar(). Warning, the below code is brain-compiled and likely has errors but it should get the point across

#include <boost/asio.hpp>

#include <iostream>
#include <thread>                                                                                                                       

class Foo                                                                                                                               
{
public:                                                                                                                                 
    Foo( boost::asio::io_service& io_service ) :                                                                                        
        _bar( 0 ),
        _io_service( io_service ),                                                                                                      
        _strand( _io_service ),                                                                                                         
        _readPipe( _io_service ),
        _writePipe( _io_service )
    {
        boost::asio::local::connect_pair( _readPipe, _writePipe );
    }

    void set_async( int v ) {
        _strand.post( [=]
            {
                _bar = v;
                std::cout << "sending " << _bar << std::endl; 
                _writePipe.send( boost::asio::buffer( &_bar, sizeof(_bar) ) );
            }
            );
    }

    void set_sync( int v ) {
        this->set_async( v );
        int value;
        _readPipe.receive( boost::asio::buffer(&value, sizeof(value) ) );
        std::cout << "set value to " << value << std::endl;
    }


private:
    int _bar;
    boost::asio::io_service& _io_service;
    boost::asio::io_service::strand _strand;
    boost::asio::local::stream_protocol::socket _readPipe;
    boost::asio::local::stream_protocol::socket _writePipe;
};

int
main()
{
    boost::asio::io_service io_service;
    boost::asio::io_service::work w(io_service);
    std::thread t( [&]{ io_service.run(); } );
    Foo f( io_service );
    f.set_sync( 20 );
    io_service.stop();
    t.join();
}

if you are unable to use c++11 lambdas, replace them with boost::bind and some more completion handler methods.

This is what I came up with:

class synchronizer_base
{
    protected:
        synchronizer_base() :
            m_has_result(false),
            m_lock(m_mutex)
        {
        }

        void wait()
        {
            while (!m_has_result)
            {
                m_condition.wait(m_lock);
            }
        }

        void notify_result()
        {
            m_has_result = true;
            m_condition.notify_all();
        }

    private:

        boost::atomic<bool> m_has_result;
        boost::mutex m_mutex;
        boost::unique_lock<boost::mutex> m_lock;
        boost::condition_variable m_condition;
};

template <typename ResultType = void>
class synchronizer : public synchronizer_base
{
    public:

        void operator()(const ResultType& result)
        {
            m_result = result;

            notify_result();
        }

        ResultType wait_result()
        {
            wait();

            return m_result;
        }

    private:

        ResultType m_result;
};

template <>
class synchronizer<void> : public synchronizer_base
{
    public:

        void operator()()
        {
            notify_result();
        }

        void wait_result()
        {
            wait();
        }
};

And I can use it, that way:

class Foo
{
  public:
    void async_get_bar(function<void (int)> handler)
    {
      m_bar_strand.post(bind(&Foo::do_get_bar, this, value, handler));
    }

    void async_set_bar(int value, function<void ()> handler)
    {
      m_bar_strand.post(bind(&Foo::do_set_bar, this, value, handler));
    }

    int get_bar()
    {
      synchronizer<int> sync;

      async_get_bar(boost::ref(sync));

      return sync.wait_result();
    }

    void set_bar(int value)
    {
      synchronizer<void> sync;

      async_set_bar(value, boost::ref(sync));

      sync.wait_result();
    }
};

The boost::ref is necessary because the instances of synchronizer are non-copyable. This could be avoided by wrapping synchronizer in some other container-class, but I'm fine with that solution as it is.

Note: Do NOT call such "synchronized" functions from inside a handler or it might just deadlock !

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top