Question

I'm trying to make a simple network application using boost::asio. I think that I understood the basic things like io_context, async functions etc, but I really don't know how to deal with buffers.

To be clear - I know how to use boost::buffers, but I don't know how to deal with it in architectural aspect.

Let's consider that 10 clients are connected to my server. The communication works bidirectional - from server to client, and from client to server. I also have threads group, which was added two thread and each thread called io_context.run().

As I know that it means, that for example two connections can be processed (read or write) in the same time. (of course, it does not mean that the maximum connection amount is equal to threads count).

Based on above assumptations - how I should deal with buffers? Are there should be two buffers per connection (one for read, and the second for write)? If yes, are there should be two buffors * actual connections in my applications?

Or maybe I should only use two buffers, because if I have two working threads, there isn't any way, that a larger amount of buffers will be needed?

I found a lot sources about socket programming in general, but it is hard to find some sources that shows how to prepare architecture, hwo to works with communication protocols etc. If you have any good sources, I will be greatful.

UPDATE

class buff {
public:
using value_type = std::array<uint8_t, 1024>;

value_type::iterator begin() { return data_.begin(); }
value_type::iterator end() { return data_.begin(); }
value_type& data() { return data_; }
size_t& capacity() { return capacity_; }
void clear() { for (auto& x : data_) x = 0; capacity_ = 0; }
bool busy() { return busy_; }
void set_busy(bool f) { busy_ = f; }

private:
bool busy_ = false;
size_t capacity_ = 0;
value_type data_{};
};

class Buffer_provider {
private:
    std::mutex sync_;
public:
using pointer = boost::shared_ptr<Buffer_provider>;

buff* take()
{
    sync_.lock();

    auto it = std::find_if(
            buffers_.begin(),
            buffers_.end(),
            [](buff& buff)->bool { return !buff.busy(); }
    );

    if (it != buffers_.end()) {
        it->set_busy(true);
    }

    if (it == buffers_.end()) {
        throw std::runtime_error("Uuuuppss....!!!!");
    }

    sync_.unlock();

    return it;
}
void release(buff* buff)
{
    sync_.lock();

    buff->clear();
    buff->set_busy(false);

    sync_.unlock();
}

private:
    std::array<buff, 4> buffers_;
};

class connection : public boost::enable_shared_from_this<connection> {
public:
    using value_type = boost::shared_ptr<connection>;
using socket_type = boost::asio::ip::tcp::socket;

connection(boost::asio::io_context& io_context, Buffer_provider::pointer buffers)
: socket_(io_context), buffers_(buffers) { }

~connection() { std::cout << "Total: " << total << std::endl; }

boost::asio::ip::tcp::socket& socket() { return socket_; }

int total = 0;

void wait_for_data()
{
    socket_.async_wait(
            boost::asio::ip::tcp::socket::wait_read,
            boost::bind(
              &connection::read_header,
              shared_from_this()
            )
    );
}

void read_header()
{
    auto buffer = buffers_->take();

    boost::asio::async_read(
              socket_,
              boost::asio::buffer(buffer->data(), Header::size),
              boost::bind(
                &connection::read_header_done,
                shared_from_this(),
                buffer,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred
              )
      );
}

void read_header_done(buff* buffer, const boost::system::error_code& ec, const size_t& bytes_transfered)
{
    if (ec) {
        buffers_->release(buffer);

        return;
    }

    auto buffer_data_it = buffer->begin();

    Header header;
    header.read(buffer_data_it, buffer->capacity());

    buffers_->release(buffer);

    boost::asio::post(boost::bind(&connection::wait_for_data, shared_from_this()));
}

private:
    boost::asio::ip::tcp::socket socket_;
    boost::shared_ptr<Buffer_provider> buffers_;
};

Let's assume that I have e.g. 1000 active connections, and 4 thread workers which process all operations on io_context.

In this particular case, when the socket is ready to read from it, connection object takes one buffer from pool, and then try to async_read some kinds of data that is coming from the socket.

I think that this may cause situations when the buffer is already taken from pool, but using async_read will causes that this opeartion will be queued, buffer will be locked, and the thread worker will take next waiting read operation on the other socket, and as above - the next buffer will be taken.

It may lead to exceed all available amount of buffers, because as the doc say: "Regardless of whether the asynchronous operation completes immediately or not, the handler will not be invoked from within this function. Invocation of the handler will be performed in a manner equivalent to using boost::asio::io_service::post(). "

Am I correct?

How to avoid this type of situations?

What the socket::async_wait is useful for?

Should I use sync_read from socket in the async_wait completition handler?

Was it helpful?

Solution

How many buffers should I use? One per simultaneous operation.

How many simultaneous operations should I have? That depends on how you write your code.

Reading/Receiving

If you've got one thread running on a connection, and it deals with each received buffer before retrieving the next. There is no reason that you cannot reuse the buffer.

If you are reading and then passing the buffer onto a work thread/work task then that buffer cannot be immediately reused. You can overcome this by using a buffer pool which returns previously allocated buffers, or simply allocating a new buffer.

Writing/Sending

I cannot recall if boost::asio takes ownership of the buffer, or copies out the contents. So I'll address both angles.

If the stream object takes ownership of the buffer -> you should never reuse it, unless it passes it back sometime later.

If the stream object copies the contents -> you can reuse the buffer immediately.

How Many Buffers?

Particularly in asynchonous code, you need to be careful about resource allocation.

Generally we like to pretend that computers have infinite memory, or atleast enough that we don't care. Unfortunately computers do have a memory cap, and its often a lot smaller than the pointers address range. Conversely you don't want to prematurely optimise/limit yourself.

One way to cap resource allocation is write the algorithm to use at most M copies. Thus the total resources depends on the N threads in the application, which should be fixed, or somewhat configurable.

Alternately the algorithm may not have a natural resource cap, such as when it passes the buffer to a task for later execution by worker threads. In this case a buffer pool, or buffer heap can provide a constraint.

  • The buffer pool is just a list of previously allocated buffers, usually with a means of deleting excess buffer objects, or allocating new buffers should it be empty, or notifying the requestor when a buffer is returned to the pool.
  • The buffer heap is more interesting as it reserves space in full or partially in which to allocate and free buffers. The heap could potentially wire the buffer pages preventing them from being swapped to disk.

Both can be made to limit the maximum buffers in use throughout your application, by restricting the number of buffers that can be allocated.

Licensed under: CC-BY-SA with attribution
scroll top