Question

I'm having some issues when I really stress test my networking code. Essentially once the socket is set up it calls this:

NetworkStream networkStream = mClient.GetStream();
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);


private void ReadCallback(IAsyncResult result)
        {
            try
            {
                int read;
                NetworkStream networkStream;
                try
                {
                    networkStream = mClient.GetStream();
                    read = networkStream.EndRead(result);
                }
                catch
                {
                    return;
                }

                if (read == 0)
                {
                    //The connection has been closed.
                    return;
                }

                var readBuffer = (byte[])result.AsyncState;
                var readCount = readBuffer.Length;
                while (readCount < 4)
                {
                    readCount += networkStream.Read(readBuffer, 0, readBuffer.Length - readCount);
                }
                var length = BitConverter.ToInt32(readBuffer, 0);
                var messageBuffer = new byte[length];
                readCount = 0;
                while (readCount < length)
                {
                    readCount += networkStream.Read(messageBuffer, 0, messageBuffer.Length - readCount);
                }
                else
                {
                    RaiseMessageReceived(this, messageBuffer);
                }
                //Then start reading from the network again.
                readBuffer = new byte[4]; //may not need to reset, not sure
                networkStream.BeginRead(readBuffer, 0, readBuffer.Length, ReadCallback, readBuffer);
            }
            catch(Exception)
            {
                //Connection is dead, stop trying to read and wait for a heal to retrigger the read queue
                return;
            }
        }

Then the below is my send methods

private byte[] GetMessageWithLength(byte[] bytes)
        {
            //Combine the msg length to the msg
            byte[] length = BitConverter.GetBytes(bytes.Length);
            var msg = new byte[length.Length + bytes.Length];
            Buffer.BlockCopy(length, 0, msg, 0, length.Length);
            Buffer.BlockCopy(bytes, 0, msg, length.Length, bytes.Length);
            return msg;
        }

public override bool Send(byte[] bytes)
        {
            lock (sendQueue)
            {
                sendQueue.Enqueue(bytes);
                Interlocked.Increment(ref sendQueueSize);
            }
            if (!mClient.Connected)
            {
                if (Connect())
                {
                    RaiseConnectionChanged(this, true, Localisation.TCPConnectionEstablished);
                }
                else
                {
                    RaiseConnectionChanged(this, false, (bytes.Length > 0 ? Localisation.TCPMessageFailed : Localisation.TCPMessageConnectionLost));
                }
            }

            try
            {
                NetworkStream networkStream = mClient.GetStream();

                lock (sendQueue)
                {
                    if (sendQueue.Count == 0)
                    {
                        return true;
                    }
                    bytes = sendQueue.Dequeue();
                }
                var msg = GetMessageWithLength(bytes);
                //Start async write operation
                networkStream.BeginWrite(msg, 0, msg.Length, WriteCallback, null);
            }
            catch (Exception ex)
            {
                RaiseConnectionChanged(this, false, (bytes.Length > 0 ? Localisation.TCPMessageFailed : Localisation.TCPMessageConnectionLost));
            }
            return true;
        }

        /// <summary>
        /// Callback for Write operation
        /// </summary>
        /// <param name="result">The AsyncResult object</param>
        private void WriteCallback(IAsyncResult result)
        {
            try
            {
                NetworkStream networkStream = mClient.GetStream();
                while (sendQueue.Count > 0)
                {
                    byte[] bytes;
                    lock (sendQueue)
                    {
                        if (sendQueue.Count == 0)
                        {
                            break;
                        }
                        bytes = sendQueue.Dequeue();
                    }
                    var msg = GetMessageWithLength(bytes);
                    networkStream.Write(msg, 0, msg.Length);
                    Interlocked.Decrement(ref sendQueueSize);
                }
                networkStream.EndWrite(result);
                mLastPacketSentAt = Environment.TickCount;
                Interlocked.Decrement(ref sendQueueSize);
            }
            catch (Exception ex)
            {
                RaiseConnectionChanged(this, false, Localisation.TCPMessageConnectionLost);
            }
        }

But yea, at some point when I stress test the system (say 500 or so clients sending lots of messages at once), I notice maybe 1 packet in every 4 million to just not get recieved. I'm not sure if the issue lies in the sending or the recieving, which is why I have included both methods. However I will point out that if I choose to send another packet from the client, it still sends and receives correctly, so it is not just queued or something.

Can anyone see something I am missing?

Was it helpful?

Solution

The two read loops (e.g. while (readCount < length)) are buggy. You always read at offset zero. You should read at an ever-increasing offset.

This lead to overwriting of already-read data.

Also, I'm not sure if it is a good idea to mix synchronous and asynchronous reads. You lose the benefit of asynchronous code that way and still have to deal with callbacks and such. I think you should decide on one style and stick to it.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top