Question

I'm trying to make simple tcp requests to a server using async_read and async_write with a timeout.

The problem is that async_read gives error when trying to read until the end of the transmission, on the first '\n' it returns with error(end of file).

When reading the string line by line (when eots->at(last_request) = '\n'), it successfully reads the whole response.

if(eots->at(last_request)=="") // read until end
        {
             boost::asio::async_read(
                socket_
                , input_buffer_
                , boost::asio::transfer_at_least(1) // read untill end or error
                , boost::bind(&tcp_client::do_Requests_read_handle, this, boost::asio::placeholders::error)
                );
        }else
        {
            boost::asio::async_read_until(
                socket_
                , input_buffer_
                , eots->at(last_request) // read until current request end of transmission sign/string or error
                , boost::bind(&tcp_client::do_Requests_read_handle, this, _1)
                );
        }

Is this expected behavior ? Am I doing it right?

For tests I tried to do a whois query (args whois.iana.org 43 com).

the full code:

/*
 * MK async TCP
 * contains basic definitions for extractions and string filtering
 *
 */
#ifndef MK_ASYNC_TCP_HPP
#define MK_ASYNC_TCP_HPP
//
// async_tcp_client.cpp
// ~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/placeholders.hpp>
//#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <iostream>

using boost::asio::deadline_timer;
using boost::asio::ip::tcp;

//
// This class manages socket timeouts by applying the concept of a deadline.
// Some asynchronous operations are given deadlines by which they must complete.
// Deadlines are enforced by an "actor" that persists for the lifetime of the
// tcp_client object:
//
//  +----------------+
//  |                |
//  | check_deadline |<---+
//  |                |    |
//  +----------------+    | async_wait()
//              |         |
//              +---------+
//
// If the deadline actor determines that the deadline has expired, the socket
// is closed and any outstanding operations are consequently cancelled.
//
// Connection establishment involves trying each endpoint in turn until a
// connection is successful, or the available endpoints are exhausted. If the
// deadline actor closes the socket, the connect actor is woken up and moves to
// the next endpoint.
//
//  +---------------+
//  |               |
//  | start_connect |<---+
//  |               |    |
//  +---------------+    |
//           |           |
//  async_-  |    +----------------+
// connect() |    |                |
//           +--->| handle_connect |
//                |                |
//                +----------------+
//                          :
// Once a connection is     :
// made, the connect        :
// actor forks in two -     :
//                          :
// an actor for reading     :       and an actor for
// inbound messages:        :       sending heartbeats:
//                          :
//  +------------+          :          +-------------+
//  |            |<- - - - -+- - - - ->|             |
//  | start_read |                     | start_write |<---+
//  |            |<---+                |             |    |
//  +------------+    |                +-------------+    | async_wait()
//          |         |                        |          |
//  async_- |    +-------------+       async_- |    +--------------+
//   read_- |    |             |       write() |    |              |
//  until() +--->| handle_read |               +--->| handle_write |
//               |             |                    |              |
//               +-------------+                    +--------------+
//
// The input actor reads messages from the socket, where messages are delimited
// by the newline character. The deadline for a complete message is 30 seconds.
//
// The heartbeat actor sends a heartbeat (a message that consists of a single
// newline character) every 10 seconds. In this example, no deadline is applied
// message sending.
//
class tcp_client
{
    public:

      tcp_client(boost::asio::io_service& io_service, std::vector<std::string> * requests , std::vector<std::string>  * responses , std::vector<std::string>  * eots, unsigned int request_timeout = 30, unsigned int connect_timeout = 10)
        : stopped_(false),
          socket_(io_service),
          deadline_(io_service),
          heartbeat_timer_(io_service),
          requests(requests),
          responses(responses),
          eots(eots),
          request_timeout(request_timeout),
          connect_timeout(connect_timeout)
      {
        if(eots->size()==0)
        {
            for(unsigned long i=0 ; i<(requests->size()-1); i++)
            {
                eots->push_back("\n");
            }
            eots->push_back("");
        }
        if(responses->size()==0)
        {
            responses->resize(requests->size());
        }
        if( (eots->size() != requests->size()) || (requests->size() != responses->size()) )
        {
            std::cerr<<std::endl<<"wrong nr of parameters"<<std::endl;
            return;
        }
      }

      // Called by the user of the tcp_client class to initiate the connection process.
      // The endpoint iterator will have been obtained using a tcp::resolver.
      void start(tcp::resolver::iterator endpoint_iter)
      {
        // Start the connect actor.
        start_connect(endpoint_iter);

        // Start the deadline actor. You will note that we're not setting any
        // particular deadline here. Instead, the connect and input actors will
        // update the deadline prior to each asynchronous operation.
        deadline_.async_wait(boost::bind(&tcp_client::check_deadline, this));
      }

      // This function terminates all the actors to shut down the connection. It
      // may be called by the user of the tcp_client class, or by the class itself in
      // response to graceful termination or an unrecoverable error.
      void stop()
      {
        stopped_ = true;
        boost::system::error_code ignored_ec;
        socket_.close(ignored_ec);
        deadline_.cancel();
        heartbeat_timer_.cancel();
      }

    private:
      void start_connect(tcp::resolver::iterator endpoint_iter)
      {
        if (endpoint_iter != tcp::resolver::iterator())
        {
          std::cout << "Trying " << endpoint_iter->endpoint() << "...\n";

          // Set a deadline for the connect operation.
          deadline_.expires_from_now(boost::posix_time::seconds(60));

          // Start the asynchronous connect operation.
          socket_.async_connect(endpoint_iter->endpoint(),
              boost::bind(&tcp_client::handle_connect,
                this, _1, endpoint_iter));
        }
        else
        {
          // There are no more endpoints to try. Shut down the client.
          stop();
        }
      }
      void handle_connect(const boost::system::error_code& ec, tcp::resolver::iterator endpoint_iter)
      {
        if (stopped_)
          return;

        // The async_connect() function automatically opens the socket at the start
        // of the asynchronous operation. If the socket is closed at this time then
        // the timeout handler must have run first.
        if (!socket_.is_open())
        {
          std::cout << "Connect timed out\n";

          // Try the next available endpoint.
          start_connect(++endpoint_iter);
        }

        // Check if the connect operation failed before the deadline expired.
        else if (ec)
        {
          std::cout << "Connect error: " << ec.message() << "\n";

          // We need to close the socket used in the previous connection attempt
          // before starting a new one.
          socket_.close();

          // Try the next available endpoint.
          start_connect(++endpoint_iter);
        }

        // Otherwise we have successfully established a connection.
        else
        {
          std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
          boost::asio::socket_base::keep_alive option(true);
          socket_.set_option(option);

          //~ // Start the input actor.
          //~ start_read();

          //~ // Start the heartbeat actor.
          //~ start_write();
          deadline_.expires_from_now(boost::posix_time::seconds(this->request_timeout));
          do_Requests_write();


        }
      }


      void handle_Requests_finish()
      {
        if(last_request<requests->size())
        {
            last_request++;
            do_Requests_write();
        }else
        {
            stop();
        }
      }

      void do_Requests_write()
      {
        if (stopped_)
          return;

        // Start an asynchronous operation to send a heartbeat message.
        boost::asio::async_write(
            socket_
            , boost::asio::buffer(requests->at(last_request)+"\n")
            , boost::bind(&tcp_client::do_Requests_write_handle, this, _1)
            );
      }

      void do_Requests_write_handle(const boost::system::error_code& ec)
      {
        if (stopped_)
          return;

        if (!ec)
        {
          do_Requests_read();
        }
        else
        {
          std::cout << "Error do_Requests_write_handle: " << ec.message() << "\n";

          stop();
        }
      }

      void do_Requests_read()
      {
        // Set a deadline for the read operation.
        deadline_.expires_from_now(boost::posix_time::seconds(this->request_timeout));

        // Start an asynchronous operation to read a newline-delimited message.
        if(eots->at(last_request)=="") // read untill end
        {
             boost::asio::async_read(
                socket_
                , input_buffer_
                , boost::asio::transfer_at_least(1) // read untill end or error
                , boost::bind(&tcp_client::do_Requests_read_handle, this, boost::asio::placeholders::error)
                );
        }else
        {
            boost::asio::async_read_until(
                socket_
                , input_buffer_
                , eots->at(last_request) // read untill current request end of transmission sign/string or error
                , boost::bind(&tcp_client::do_Requests_read_handle, this, _1)
                );
        }
      }

      void do_Requests_read_handle(const boost::system::error_code& ec)
      {
        if (stopped_)
          return;

        if (!ec)
        {
          // Extract the newline-delimited message from the buffer.
          //~ std::string line;
          //~ std::istream is(&input_buffer_);
          //~ std::getline(is, line);
          std::istream response_istream(&input_buffer_);
          std::string response;
          response_istream >> response;

          // Empty messages are heartbeats and so ignored.
          std::cout << "Received: " << response << "\n";
          responses->at(last_request)+=response+"\n";
          //~ if (!line.empty())
          //~ {
            //~ std::cout << "Received: " << line << "\n";
          //~ }

          do_Requests_read();
        }
        else
        {
            std::cout<<(std::string)"Error on receive: " + ec.message() + "\n";
            responses->at(last_request)+= (std::string)"Error on receive: " + ec.message() + "\n";
            handle_Requests_finish();
        }
      }

      void check_deadline()
      {
        if (stopped_)
          return;

        // Check whether the deadline has passed. We compare the deadline against
        // the current time since a new asynchronous operation may have moved the
        // deadline before this actor had a chance to run.
        if (deadline_.expires_at() <= deadline_timer::traits_type::now())
        {
          // The deadline has passed. The socket is closed so that any outstanding
          // asynchronous operations are cancelled.
          socket_.close();

          // There is no longer an active deadline. The expiry is set to positive
          // infinity so that the actor takes no action until a new deadline is set.
          deadline_.expires_at(boost::posix_time::pos_infin);
        }

        // Put the actor back to sleep.
        deadline_.async_wait(boost::bind(&tcp_client::check_deadline, this));
      }

    private:
        bool stopped_;
        tcp::socket socket_;
        boost::asio::streambuf input_buffer_;
        deadline_timer deadline_;
        deadline_timer heartbeat_timer_;
        std::vector<std::string> *requests, *responses, *eots;
        unsigned int last_request=0;
        unsigned int request_timeout = 30;
        unsigned int connect_timeout = 10;
};

int main(int argc, char* argv[])
{
    std::vector<std::string> requests, responses, eots;
  try
  {
    if (argc < 4)
    {
      std::cerr << "Usage: tcp_client <host> <port> <query1> <query2> <query3> [..]\n";
      return 1;
    }
    for(int i = 3; i<argc ; i++ )
    {
        requests.push_back(argv[i]);
        eots.push_back("");
        responses.push_back("");
    }

    boost::asio::io_service io_service;
    tcp::resolver r(io_service);
    tcp_client c(io_service,&requests,&responses,&eots);

    c.start(r.resolve(tcp::resolver::query(argv[1], argv[2])));

    io_service.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}
#endif // MK_ASYNC_TCP_HPP
Was it helpful?

Solution

The async_read() operation completes with an error code of boost::asio::error::eof because the end of file has been reached, not because the first \n has been reached when reading from input_buffer_ word by word.

The response to com on whois.iana.org:43 is 1830 bytes.

$ nc whois.iana.org 43 | wc --bytesenter
comenter
1830

When boost::asio::streambuf is provided to the read operation, it will attempt to allocate buffers of an unspecified size into which data can be read. The current implementation will attempt to allocate a buffer with a size of 512. Hence, if 1830 bytes are received, and each read operation reads up to the buffer max of 512 bytes, then all received bytes will have been read on the 4th read operation. Thus, the 5th read operation will result in an end of file.

The completion condition for async_read_until() results in a slightly different behavior. This operation is considered complete when either the streambuf contains the specified delimiter or an error occurred. When async_read_until() completes, it is possible for the streambuf to contain additional data beyond the delimiter. If the streambuf's additional data contains a delimiter, then a subsequent calls to async_read_until() will have its completion criteria met without needing to call the AsyncReadStream's async_read_some() function.

OTHER TIPS

It you want to read the whole transmission when it's received, try using socket_.async_read_some and replacing the input buffer with an array or vector. See Short reads and writes

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top