Domanda

I've built a C# application using NamedPipeServerStream/NamedPipeClientStream, and I'm having trouble serializing objects when the client reads too slowly. Here is the minimal example I can put together.

The client (pure consumer):

NamedPipeClientStream pipeClient;
if (useString)
    reader = new StreamReader(pipeClient);
while (true)
{
    Thread.Sleep(5000);
    if (useString)
    {
        string line = reader.ReadLine();
    }
    else
    {
        Event evt = (Event)formatter.Deserialize(pipeClient);
    }
}

The (pure producer):

while (true)
{
    i++;
    Thread.Sleep(1000);
    if (useStrings)
    {
        StreamWriter writer = new StreamWriter(m_pipeServer);
        writer.WriteLine("START data payload {0} END", i);
        writer.Flush();
    }

    else
    {
        BinaryFormatter formatter = new BinaryFormatter();
        formatter.Serialize(m_pipeServer, new Event(i));
    }

    m_pipeServer.Flush();
    m_pipeServer.WaitForPipeDrain();
}

And "Event" is a simple class with a single property tracking the payload: i.

The behavior I expect is simply "missing" events, when the server produces too much for the client to read. However, in the string case I get a random ordering of events:

START data payload 0 END
START data payload 1 END
START data payload 2 END
START data payload 4 END
START data payload 15 END
START data payload 16 END
START data payload 24 END
START data payload 3 END
START data payload 35 END
START data payload 34 END
START data payload 17 END

And for the binary serializer I get an exception (this is less surprising):

SerializationException: Binary stream '0' does not contain a valid BinaryHeader. Possible causes are invalid stream or object version change between serialization and deserialization.

Lastly, note that if I remove the call to Sleep on the client, everything works fine: all events are received, in order (as expected).

So I'm trying to figure out how to do serialize binary events over a named pipe when the client may read too slowly and miss events. In my scenario, missing events is completely fine. However, but I'm surprised at the string events coming out of order intact instead of truncated (due to buffer rollover) or simply dropped.

The binary formatter case is actually the one I care about. I'm trying to serialize and pass relatively small events (~300 bytes) across a named pipe to multiple consumer programs, but I'm concerned those clients won't be able to keep up with the volume.

How do I properly produce/consume these events across a named pipe if we exhaust the buffer? My desired behavior is simply dropping events that the client can't keep up with.

È stato utile?

Soluzione

I wouldn't trust the transport layer (i.e. the pipe) to drop packets that the client can't keep up with. I would create a circular queue on the client. A dedicated thread would then service the pipe and put messages on the queue. A separate client thread (or multiple threads) would service the queue. Doing it this way, you should be able to keep the pipe clean.

Since it's a circular queue, newer messages will overwrite older ones. A client reading will always get the oldest message that hasn't yet been processed.

Creating a circular queue is pretty easy, and you can even make it implement IProducerConsumerCollection so that you can use it as the backing store for BlockingCollection.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top