Question

I am having an issue where an object with a byte[20] is being passed into a BlockingCollection on one thread and another thread returning the object with a byte[0] using BlockingCollection.Take(). I think this is a threading issue but I do not know where or why this is happening considering that BlockingCollection is a concurrent collection.

Sometimes on thread2, myclass2.mybytes equals byte[0]. Any information on how to fix this is greatly appreciated.

[EDIT] The original code. I removed the above code that seemed to run just fine so I took the time to go through my original code and post it.

MessageBuffer.cs

public class MessageBuffer : BlockingCollection<Message>
{
}

In the class that has Listener() and ReceivedMessageHandler(object messageProcessor)

private MessageBuffer RecievedMessageBuffer;

On Thread1

    private void Listener()
    {
        while (this.IsListening)
        {
            try
            {
                Message message = Message.ReadMessage(this.Stream, this);
                if (message != null)
                {
                    this.RecievedMessageBuffer.Add(message);
                }
            }
            catch (IOException ex)
            {
                if (!this.Client.Connected)
                {
                    this.OnDisconnected();
                }
                else
                {
                    Logger.LogException(ex.ToString());
                    this.OnDisconnected();
                }
            }
            catch (Exception ex)
            {
                Logger.LogException(ex.ToString());
                this.OnDisconnected();
            }
        }
    }

Message.ReadMessage(NetworkStream stream, iTcpConnectClient client)

    public static Message ReadMessage(NetworkStream stream, iTcpConnectClient client)
    {
        int ClassType = -1;
        Message message = null;

        try
        {
            ClassType = stream.ReadByte();
            if (ClassType == -1)
            {
                return null;
            }

            if (!Message.IDTOCLASS.ContainsKey((byte)ClassType))
            {
                throw new IOException("Class type not found");
            }

            message = Message.GetNewMessage((byte)ClassType);
            message.Client = client;
            message.ReadData(stream);

            if (message.Buffer.Length < message.MessageSize + Message.HeaderSize)
            {
                return null;
            }

        }
        catch (IOException ex)
        {
            Logger.LogException(ex.ToString());
            throw ex;
        }
        catch (Exception ex)
        {
            Logger.LogException(ex.ToString());
            //throw ex;
        }

        return message;
    }

On Thread2

    private void ReceivedMessageHandler(object messageProcessor)
    {
        if (messageProcessor != null)
        {
            while (this.IsListening)
            {
                Message message = this.RecievedMessageBuffer.Take();
                message.Reconstruct();
                message.HandleMessage(messageProcessor);
            }
        }
        else
        {
            while (this.IsListening)
            {
                Message message = this.RecievedMessageBuffer.Take();
                message.Reconstruct();
                message.HandleMessage();
            }
        }
    }

PlayerStateMessage.cs

public class PlayerStateMessage : Message
{
    public GameObject PlayerState;

    public override int MessageSize
    {
        get { return 12; }
    }

    public PlayerStateMessage()
        : base()
    {
        this.PlayerState = new GameObject();
    }

    public PlayerStateMessage(GameObject playerState)
    {
        this.PlayerState = playerState;
    }

    public override void Reconstruct()
    {
        this.PlayerState.Poisiton = this.GetVector2FromBuffer(0);
        this.PlayerState.Rotation = this.GetFloatFromBuffer(8);
        base.Reconstruct();
    }

    public override void Deconstruct()
    {
        this.CreateBuffer();
        this.AddToBuffer(this.PlayerState.Poisiton, 0);
        this.AddToBuffer(this.PlayerState.Rotation, 8);
        base.Deconstruct();
    }

    public override void HandleMessage(object messageProcessor)
    {
        ((MessageProcessor)messageProcessor).ProcessPlayerStateMessage(this);
    }
}

Message.GetVector2FromBuffer(int bufferlocation) This is where the exception is thrown because this.Buffer is byte[0] when it should be byte[20].

    public Vector2 GetVector2FromBuffer(int bufferlocation)
    {
        return new Vector2(
            BitConverter.ToSingle(this.Buffer, Message.HeaderSize + bufferlocation),
            BitConverter.ToSingle(this.Buffer, Message.HeaderSize + bufferlocation + 4));
    }
Was it helpful?

Solution

So this was a hard problem to solve. As far as I know, I was just receiving random bytes so I changed up my "Message" quite a bit. there is now a header buffer and a data buffer. The entire message is encapsulated with a beginning marker and an ending marker while the header and data buffers are each encapsulated by different markers. this has allowed me to tell when I have received bad data and can discard the message. If the message does get discarded, on the next message read, instead of just checking if the first 4 bytes received is the opening marker, it will read byte by byte until the last 4 bytes read are equal to the marker.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top