سؤال

I really don't understand what is going wrong here, so I hope somebody may spot something I missed.

I'm writing a user daemon which is accepting a client that I develop in java. For now this client only connects and sends the username password. I developed the code under cygwin and there it works. The daemon sends its introduction, then the client sends the username and password and the daemon responds either with disconnecting the client or sending OK (not yet done).

When I test this using cygwin it works on the localhost. I ported the code to HPUX and the client can connect and also receives the introduction from the daemon. Now when the client sends it's username and password it doesn't work anymore. The daemon only receives one byte and when it tries to read again I get -1 as result with EAGAIN and nothing else. The client doesn't show any error and also on the daemon side there is none. When I step through the code with gdb the messages are revceived completely. :(

The code that I use is this, if more info is needed I can add it:

int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
    int max = getSendBufferSize();

    if(nBytes != -1 && nBytes < max)
        max = nBytes;

    SocketMessage sb(max);
    int rcv_bytes = 0;
    int total = 0;
    int error = 0;

    while(1)
    {
        rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
        error = errno;
        FILE_LOG(logDEBUG4) << "Received on socket: " << mSocketId << " bytes: " << rcv_bytes << "  expected:" << sb.size() << " total: " << total << " errno: " << error;

        if(rcv_bytes == -1)
        {
            if(error == EAGAIN || error == EWOULDBLOCK)
                return total;

            throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
        }

        //if(rcv_bytes == 0)
        //  throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection has been closed!");

        total += rcv_bytes;
        oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
    }

    return total;
}

The output of the log is this:

16:16:04.391 DEBUG4: Received on socket: 4 bytes: 1  expected:32768 total: 0 errno: 2
16:16:04.391 DEBUG4: Received on socket: 4 bytes: -1  expected:32768 total: 1 errno: 11

So where is the rest of the 30 bytes and why is it not returned?

UPDATE

This is only the part of the code that actually receives the data. The socket class itself only deals with the raw socket without any protocoll. The protocol is implemented in a separate class. This receive function is supposed to grab as many bytes as there are available on the network and put it in a buffer (SocketMessage). It doesn't matter if the number of bytes are multiple messages or only part of a single message, because the controlling class will construct the actual messageblock out of the (maybe) partial messagestream. So that the first call only receives one byte is not the problem, because if the message is not complete the caller waits in a loop until more data arrives and a message becomes completed. since there can be more than one client I'm using non blocking sockets. I didn't want to deal with separate threads, so my server is multiplexing the connections. The problem here is that the receive only receives one byte while I know that there should be more. The errorcode EAGAIN is handled and when the receive is entered next, it should get more bytes. Even if the network only transmitted only one byte, the rest of the message should still arrive next, but it doesn't. The select which waits on the socket to receive data blocks as if there is nothing there. When I run the same code in dbg and step through it works. When I connect again with the same client then suddenly more bytes are revceived. When I use the same code with cygwin using localhost it works fine.

UPDATE

Here is the complete code.

Main.cpp

    mServerSocket = new TCPSocket(getListeningPort());
    mServerSocket->bindSocket();
    mServerSocket->setSocketBlocking(false);
    mServerSocket->listenToClient(0);

    setupSignals();
    while(isSIGTERM() == false)
    {
        try
        {
            max = prepareListening();

            //memset(&ts, 0, sizeof(struct timespec));
            pret = pselect(max+1, &mReaders, &mWriters, &mExceptions, NULL, &mSignalMask);
            error_code = errno;
            if(pret == 0)
            {
                // Timeout occured, but we are not interested in that for now.
                // Currently this shouldn't happen anyway.
                continue;
            }
            processRequest(pret, error_code);

        }
        catch (SocketException &excp)
        {
            removeClientConnection(findClientConnection(excp.getTCPSocket()));
        }
    } // while sigTERM


BaseSocket.cpp:

#ifdef UNIX

    #include <sys/socket.h>
    #include <sys/types.h>
    #include <sys/ioctl.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <errno.h>

#endif

#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/base_socket.h"

using namespace std;

BaseSocket::BaseSocket(void)
{
    mSocketId = -1;
    mSendBufferSize = MAX_SEND_LEN;
}

BaseSocket::BaseSocket(int pNumber)
{
    mSocketId = -1;
    mPortNumber = pNumber;
    mBlocking = 1;
    mSendBufferSize = MAX_SEND_LEN;

    try
    {
        if ((mSocketId = ::socket(AF_INET, SOCK_STREAM, 0)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::CONSTRUCTOR, errno, "Socket", "unix: error in socket constructor", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }

    /*
     set the initial address of client that shall be communicated with to
     any address as long as they are using the same port number.
     The clientAddr structure is used in the future for storing the actual
     address of client applications with which communication is going
     to start
     */
    mClientAddr.sin_family = AF_INET;
    mClientAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    mClientAddr.sin_port = htons(mPortNumber);
    updateSendBufferSize(MAX_SEND_LEN);
}

void BaseSocket::updateSendBufferSize(int nNewSize)
{
    mSendBufferSize = getSendBufferSize();
    if(mSendBufferSize > nNewSize)
        mSendBufferSize = nNewSize;
}

BaseSocket::~BaseSocket(void)
{
    close();
}

void BaseSocket::setSocketId(int socketFd)
{
    mSocketId = socketFd;
}

int BaseSocket::getSocketId()
{
    return mSocketId;
}

// returns the port number
int BaseSocket::getPortNumber()
{
    return mPortNumber;
}

void BaseSocket::setDebug(int debugToggle)
{
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (char *) &debugToggle, sizeof(debugToggle)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set debug", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setReuseAddr(int reuseToggle)
{
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (char *) &reuseToggle,
              sizeof(reuseToggle)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set reuse address", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setKeepAlive(int aliveToggle)
{
    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (char *) &aliveToggle,
              sizeof(aliveToggle)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set keep alive", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setLingerSeconds(int seconds)
{
    struct linger lingerOption;

    if (seconds > 0)
    {
        lingerOption.l_linger = seconds;
        lingerOption.l_onoff = 1;
    }
    else
        lingerOption.l_onoff = 0;

    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
              sizeof(struct linger)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger seconds", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setLingerOnOff(bool lingerOn)
{
    struct linger lingerOption;

    if (lingerOn)
        lingerOption.l_onoff = 1;
    else
        lingerOption.l_onoff = 0;

    try
    {
        if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
              sizeof(struct linger)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger on/off", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void BaseSocket::setSendBufferSize(int sendBufSize)
{
    if (setsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (char *) &sendBufSize, sizeof(sendBufSize)) == -1)
    {
#ifdef UNIX
        throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error send buffer size", __FILE__, __LINE__);
#endif
    }
    updateSendBufferSize(sendBufSize);
}

void BaseSocket::setReceiveBufferSize(int receiveBufSize)
{
    if (setsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (char *) &receiveBufSize, sizeof(receiveBufSize)) == -1)
    {
#ifdef UNIX
        throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set receive buffer size", __FILE__, __LINE__);
#endif
    }
}

int BaseSocket::isSocketBlocking()
{
    return mBlocking;
}

void BaseSocket::setSocketBlocking(int blockingToggle)
{
    if (blockingToggle)
    {
        if (isSocketBlocking())
            return;
        else
            mBlocking = 1;
    }
    else
    {
        if (!isSocketBlocking())
            return;
        else
            mBlocking = 0;
    }

    try
    {
#ifdef UNIX
        int flags;
        if (-1 == (flags = fcntl(mSocketId, F_GETFL, 0)))
            flags = 0;

        if(mBlocking)
            fcntl(mSocketId, F_SETFL, flags & (~O_NONBLOCK));
        else
            fcntl(mSocketId, F_SETFL, flags | O_NONBLOCK);

        /*if (ioctl(socketId, FIONBIO, (char *) &blocking) == -1)
        {
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set socke blocking");
        }*/
#endif
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

int BaseSocket::getDebug()
{
    int myOption;
    int myOptionLen = sizeof(myOption);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (void *) &myOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get debug", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }

    return myOption;
}

int BaseSocket::getReuseAddr()
{
    int myOption;
    int myOptionLen = sizeof(myOption);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (void *) &myOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get reuse address", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }

    return myOption;
}

int BaseSocket::getKeepAlive()
{
    int myOption;
    int myOptionLen = sizeof(myOption);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (void *) &myOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get keep alive", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return myOption;
}

int BaseSocket::getLingerSeconds()
{
    struct linger lingerOption;
    int myOptionLen = sizeof(struct linger);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger seconds", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }

    return lingerOption.l_linger;
}

bool BaseSocket::getLingerOnOff()
{
    struct linger lingerOption;
    int myOptionLen = sizeof(struct linger);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger on/off", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }

    if (lingerOption.l_onoff == 1)
        return true;
    else
        return false;
}

int BaseSocket::getSendBufferSize()
{
    int sendBuf;
    int myOptionLen = sizeof(sendBuf);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (void *)&sendBuf, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get send buffer size", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return sendBuf;
}

int BaseSocket::getReceiveBufferSize()
{
    int rcvBuf;
    int myOptionLen = sizeof(rcvBuf);

    try
    {
        if (getsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (void *) &rcvBuf, &myOptionLen) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get receive buffer size", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return -1;
    }
    return rcvBuf;
}

ostream &operator<<(ostream& io, BaseSocket& s)
{
    string flagStr = "";

    io << endl;
    io << "Summary of socket settings:" << endl;
    io << "   Socket Id:     " << s.getSocketId() << endl;
    io << "   port #:        " << s.getPortNumber() << endl;
    io << "   debug:         " << (flagStr = s.getDebug() ? "true" : "false")
          << endl;
    io << "   reuse addr:    " << (flagStr = s.getReuseAddr() ? "true" : "false")
          << endl;
    io << "   keep alive:    " << (flagStr = s.getKeepAlive() ? "true" : "false")
          << endl;
    io << "   send buf size: " << s.getSendBufferSize() << endl;
    io << "   recv bug size: " << s.getReceiveBufferSize() << endl;
    io << "   blocking:      "
          << (flagStr = s.isSocketBlocking() ? "true" : "false") << endl;
    io << "   linger on:     "
          << (flagStr = s.getLingerOnOff() ? "true" : "false") << endl;
    io << "   linger seconds: " << s.getLingerSeconds() << endl;
    io << endl;
    return io;
}

void BaseSocket::close(void)
{
    ::close(mSocketId);
}

TCPSocket.cpp:


#ifdef UNIX
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <sys/ioctl.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <errno.h>
#endif

#include <sstream>

#include "support/logging/log.h"
#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/tcp_socket.h"

using namespace std;

const int MSG_HEADER_LEN = 6;

TCPSocket::TCPSocket()
: BaseSocket()
{
}

TCPSocket::TCPSocket(int portId)
: BaseSocket(portId)
{
}

TCPSocket::~TCPSocket()
{
}

void TCPSocket::initialize()
{
}

void TCPSocket::bindSocket()
{
    try
    {
        if (bind(mSocketId, (struct sockaddr *) &mClientAddr, sizeof(struct sockaddr_in)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::BIND, 0, "Socket", "unix: error calling bind()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

void TCPSocket::connectToServer(string& serverNameOrAddr, hostType hType)
{
    /* 
     when this method is called, a client socket has been built already,
     so we have the socketId and portNumber ready.

     a HostInfo instance is created, no matter how the server's name is
     given (such as www.yuchen.net) or the server's address is given (such
     as 169.56.32.35), we can use this HostInfo instance to get the
     IP address of the server
     */

    HostInfo serverInfo(serverNameOrAddr, hType);

    // Store the IP address and socket port number
    struct sockaddr_in serverAddress;

    serverAddress.sin_family = AF_INET;
    serverAddress.sin_addr.s_addr = inet_addr(
          serverInfo.getHostIPAddress().c_str());
    serverAddress.sin_port = htons(mPortNumber);

    // Connect to the given address
    try
    {
        if (connect(mSocketId, (struct sockaddr *) &serverAddress, sizeof(serverAddress)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::CONNECT, 0, "Socket", "unix: error calling connect()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

TCPSocket *TCPSocket::acceptClient(string& clientHost)
{
    int newSocket; // the new socket file descriptor returned by the accept system call

    // the length of the client's address
    int clientAddressLen = sizeof(struct sockaddr_in);
    struct sockaddr_in clientAddress;    // Address of the client that sent data

    // Accepts a new client connection and stores its socket file descriptor
    try
    {
        if ((newSocket = accept(mSocketId, (struct sockaddr *) &clientAddress, &clientAddressLen)) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::ACCEPT, 0, "Socket", "unix: error calling accept()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
        return NULL;
    }

    // Get the host name given the address
    char *sAddress = inet_ntoa((struct in_addr) clientAddress.sin_addr);
    HostInfo clientInfo(sAddress, ADDRESS);
    clientHost += clientInfo.getHostName();

    // Create and return the new TCPSocket object
    TCPSocket* retSocket = new TCPSocket();
    retSocket->setSocketId(newSocket);
    return retSocket;
}

void TCPSocket::listenToClient(int totalNumPorts)
{
    try
    {
        if (listen(mSocketId, totalNumPorts) == -1)
        {
#ifdef UNIX
            throw SocketException(this, SocketException::LISTEN, 0, "Socket", "unix: error calling listen()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }
}

ostream &operator<<(ostream &oStream, const TCPSocket &oSocket)
{
    oStream << oSocket.mSocketId;
    return oStream;
}

int TCPSocket::send(SocketMessage const &oBuffer, int nSize)
{
    int numBytes;  // the number of bytes sent
    int error = errno;

    if(nSize == -1)
        nSize = oBuffer.size();

    if((unsigned int)nSize > oBuffer.size())
    {
        std::stringstream ss;
        ss << "Invalid Buffersize! Requested: " << (unsigned int)nSize << " Provided: " << oBuffer.size();
        std::string s;
        ss >> s;
        FILE_LOG(logERROR) << s;
        throw SocketException(this, SocketException::SEND, 0, "Socket", s, __FILE__, __LINE__);
    }

    // Sends the message to the connected host
    try
    {
        FILE_LOG(logDEBUG4) << "Sending on socket: "<< mSocketId << " bytes:" << nSize;
        numBytes = ::send(mSocketId, &oBuffer[0], nSize, 0);
        error = errno;
        FILE_LOG(logDEBUG4) << "Sent on socket: "<< mSocketId << " bytes:" << nSize << " errno: " << error;
        if(numBytes == -1)
        {
#ifdef UNIX
            if(error == EAGAIN || error == EWOULDBLOCK)
            {
                return -1;
            }
            else
                throw SocketException(this, SocketException::SEND, error, "Socket", "unix: error calling send()", __FILE__, __LINE__);
#endif
        }
    }
    catch (SocketException& excp)
    {
        excp.response();
    }

    return numBytes;
}

int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
    int max = getSendBufferSize();

    if(nBytes != -1 && nBytes < max)
        max = nBytes;

    SocketMessage sb(max);
    int rcv_bytes = 0;
    int total = 0;
    int error = 0;

    while(1)
    {
        rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
        error = errno;
        FILE_LOG(logDEBUG4) << "Received on socket: " << getSocketId() << " bytes: " << rcv_bytes << "  expected:" << sb.size() << " total: " << total << " errno: " << error;

        if(rcv_bytes == -1)
        {
            if(error == EAGAIN || error == EWOULDBLOCK)
                return total;

            throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
        }

        // Socket has been closed.
        if(rcv_bytes == 0)
            return total;

        total += rcv_bytes;
        oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
    }

    return total;
}

void TCPSocket::close(void)
{
    BaseSocket::close();
}
هل كانت مفيدة؟

المحلول

Are you sure the Nagle Algorithm isn't kicking in here? If you haven't disabled it by setting the TCP_NODELAY socket option your data may not be sent until a certain amount of data (MSS) is available.

نصائح أخرى

A couple of questions first:
- Why are you using non-blocking I/O?
- You apparently know the message should be 30 bytes long, so why are you asking for 32768 bytes?

There's a lot more to sockets than just calling recv if you are using non-blocking I/O. With blocking I/O, every error is a true error. With non-blocking I/O you have to deal with that pesky EAGAIN / EWOULDBLOCK error. Recovery is possible, but you are responsible for that recovery when you configure the device to use non-blocking I/O.

As the first name (EAGAIN) suggests, getting this error result means you need to try again, preferably after waiting a bit. A simple but not very good way to wait is to sleep (or usleep or nanosleep) for some amount of time. The problem with this is that you might have waited too long, or not long enough. Wait too long and your system might become non responsive or the sender might go away. Wait too little and you are making the computer thrash between privileged and non-privileged mode.

The best way to wait for such an event is to use an event-based scheme, but unfortunately those schemes aren't portable. A portable scheme is to use select or poll. You can make select or poll wait indefinitely or you can specify a timeout. I find poll a lot easier to use, particularly when there's only one file descriptor involved. That's personal preference however. Others find select easier to use.

The number of bytes returned by any one call to recv is unpredictable. Many messages are received in several parts, so it is necessary to call recv again if you don't yet have the entire message. But your code does not seem to have a way to determine if the whole messaage has been received. And, your code returns on EWOULDBLOCK, but EWOULDBLOCK is a normal part of socket operations. It does not signify an error or the completion of a message.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top