C#:实施 NetworkStream.Peek?
-
25-09-2019 - |
题
目前,还没有一个 NetworkStream.Peek
C# 中的方法。实现这种功能的最佳方法是什么 NetworkStream.ReadByte
除了返回的 byte
实际上并没有从 Stream
?
解决方案
如果不需要实际检索字节,可以参考 DataAvailable
财产。
否则,你可以用一个包裹它 StreamReader
并调用它的 Peek
方法。
请注意,由于延迟问题,这两种方法对于从网络流中读取数据都不是特别可靠。数据可能立即变得可用(存在于读缓冲区中) 后 你偷看。
我不确定你打算用这个做什么,但是 Read
方法上 NetworkStream
是一个阻塞调用,因此您实际上不需要检查状态,即使您正在接收块。如果您试图在从流读取时保持应用程序响应,则应使用线程或异步调用来接收数据。
编辑:根据 这个帖子, StreamReader.Peek
是越野车 NetworkStream
, ,或者至少有未记录的行为,所以如果您选择走这条路,请务必小心。
更新 - 对评论的回应
“查看”实际流本身的概念实际上是不可能的;它只是一个流,一旦接收到字节,它就不再在流上。有些流支持查找,因此您可以从技术上重新读取该字节,但是 NetworkStream
不是其中之一。
窥视仅适用于将流读入缓冲区时;一旦数据位于缓冲区中,那么查看就很容易,因为您只需检查缓冲区中当前位置的任何内容。这就是为什么一个 StreamReader
能够做到这一点;不 Stream
类一般都会有自己的 Peek
方法。
现在,具体针对这个问题,我质疑这是否真的是正确的答案。我理解动态选择处理流的方法的想法,但是你知道吗? 真的 需要在原始流上执行此操作吗?你能不能先将流读入字节数组,甚至将其复制到 MemoryStream
, ,并从那时起处理它?
我看到的主要问题是,如果当您从网络流中读取数据时发生问题,数据就会消失。但如果您先将其读入临时位置,则可以对其进行调试。您可以找出数据是什么以及尝试处理数据的对象为何中途失败。
一般来说,您想做的第一件事是 NetworkStream
将其读入本地缓冲区。我能想到不这样做的唯一原因是,如果您正在读取大量数据 - 即使这样,如果内存不适合文件系统,我可能会考虑使用文件系统作为中间缓冲区。
我不知道你的具体要求,但根据我到目前为止所学到的,我的建议是:不要尝试直接从 NetworkStream
除非有令人信服的理由这样做。考虑首先将数据读入内存或磁盘,然后处理副本。
其他提示
我遇到了同样的“查看幻数,然后决定将流发送到哪个流处理器”的要求,不幸的是,我无法通过传递已经消耗的字节来解决这个问题 - 正如对 Aaronaught 答案的评论中所建议的那样在单独的参数中进入流处理方法,因为这些方法是给定的,它们期望 System.IO.Stream 而不是其他任何东西。
我通过创建一个或多或少通用的解决了这个问题 可查看流 包装 Stream 的类。它适用于 NetworkStreams,也适用于任何其他 Stream,前提是您 Stream.CanRead 它。
编辑
或者,您可以使用全新的 ReadSeekableStream
并做
var readSeekableStream = new ReadSeekableStream(networkStream, /* >= */ count);
...
readSeekableStream.Read(..., count);
readSeekableStream.Seek(-count, SeekOrigin.Current);
无论如何,这里来了 PeekableStream
:
/// <summary>
/// PeekableStream wraps a Stream and can be used to peek ahead in the underlying stream,
/// without consuming the bytes. In other words, doing Peek() will allow you to look ahead in the stream,
/// but it won't affect the result of subsequent Read() calls.
///
/// This is sometimes necessary, e.g. for peeking at the magic number of a stream of bytes and decide which
/// stream processor to hand over the stream.
/// </summary>
public class PeekableStream : Stream
{
private readonly Stream underlyingStream;
private readonly byte[] lookAheadBuffer;
private int lookAheadIndex;
public PeekableStream(Stream underlyingStream, int maxPeekBytes)
{
this.underlyingStream = underlyingStream;
lookAheadBuffer = new byte[maxPeekBytes];
}
protected override void Dispose(bool disposing)
{
if (disposing)
underlyingStream.Dispose();
base.Dispose(disposing);
}
/// <summary>
/// Peeks at a maximum of count bytes, or less if the stream ends before that number of bytes can be read.
///
/// Calls to this method do not influence subsequent calls to Read() and Peek().
///
/// Please note that this method will always peek count bytes unless the end of the stream is reached before that - in contrast to the Read()
/// method, which might read less than count bytes, even though the end of the stream has not been reached.
/// </summary>
/// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and
/// (offset + number-of-peeked-bytes - 1) replaced by the bytes peeked from the current source.</param>
/// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data peeked from the current stream.</param>
/// <param name="count">The maximum number of bytes to be peeked from the current stream.</param>
/// <returns>The total number of bytes peeked into the buffer. If it is less than the number of bytes requested then the end of the stream has been reached.</returns>
public virtual int Peek(byte[] buffer, int offset, int count)
{
if (count > lookAheadBuffer.Length)
throw new ArgumentOutOfRangeException("count", "must be smaller than peekable size, which is " + lookAheadBuffer.Length);
while (lookAheadIndex < count)
{
int bytesRead = underlyingStream.Read(lookAheadBuffer, lookAheadIndex, count - lookAheadIndex);
if (bytesRead == 0) // end of stream reached
break;
lookAheadIndex += bytesRead;
}
int peeked = Math.Min(count, lookAheadIndex);
Array.Copy(lookAheadBuffer, 0, buffer, offset, peeked);
return peeked;
}
public override bool CanRead { get { return true; } }
public override long Position
{
get
{
return underlyingStream.Position - lookAheadIndex;
}
set
{
underlyingStream.Position = value;
lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Position, as that might throw NotSupportedException,
// in which case we don't want to change the lookAhead status
}
}
public override int Read(byte[] buffer, int offset, int count)
{
int bytesTakenFromLookAheadBuffer = 0;
if (count > 0 && lookAheadIndex > 0)
{
bytesTakenFromLookAheadBuffer = Math.Min(count, lookAheadIndex);
Array.Copy(lookAheadBuffer, 0, buffer, offset, bytesTakenFromLookAheadBuffer);
count -= bytesTakenFromLookAheadBuffer;
offset += bytesTakenFromLookAheadBuffer;
lookAheadIndex -= bytesTakenFromLookAheadBuffer;
if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
// copying into same array should be fine, according to http://msdn.microsoft.com/en-us/library/z50k9bft(v=VS.90).aspx :
// "If sourceArray and destinationArray overlap, this method behaves as if the original values of sourceArray were preserved
// in a temporary location before destinationArray is overwritten."
Array.Copy(lookAheadBuffer, lookAheadBuffer.Length - bytesTakenFromLookAheadBuffer + 1, lookAheadBuffer, 0, lookAheadIndex);
}
return count > 0
? bytesTakenFromLookAheadBuffer + underlyingStream.Read(buffer, offset, count)
: bytesTakenFromLookAheadBuffer;
}
public override int ReadByte()
{
if (lookAheadIndex > 0)
{
lookAheadIndex--;
byte firstByte = lookAheadBuffer[0];
if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
Array.Copy(lookAheadBuffer, 1, lookAheadBuffer, 0, lookAheadIndex);
return firstByte;
}
else
{
return underlyingStream.ReadByte();
}
}
public override long Seek(long offset, SeekOrigin origin)
{
long ret = underlyingStream.Seek(offset, origin);
lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Seek(), as that might throw NotSupportedException,
// in which case we don't want to change the lookAhead status
return ret;
}
// from here on, only simple delegations to underlyingStream
public override bool CanSeek { get { return underlyingStream.CanSeek; } }
public override bool CanWrite { get { return underlyingStream.CanWrite; } }
public override bool CanTimeout { get { return underlyingStream.CanTimeout; } }
public override int ReadTimeout { get { return underlyingStream.ReadTimeout; } set { underlyingStream.ReadTimeout = value; } }
public override int WriteTimeout { get { return underlyingStream.WriteTimeout; } set { underlyingStream.WriteTimeout = value; } }
public override void Flush() { underlyingStream.Flush(); }
public override long Length { get { return underlyingStream.Length; } }
public override void SetLength(long value) { underlyingStream.SetLength(value); }
public override void Write(byte[] buffer, int offset, int count) { underlyingStream.Write(buffer, offset, count); }
public override void WriteByte(byte value) { underlyingStream.WriteByte(value); }
}
如果您有权访问 Socket
对象,你可以尝试 接收方式, 通过 SocketFlags.Peek
. 。这类似于 MSG_PEEK
可以传递给的标志 recv
在 BSD Sockets 或 Winsock 中调用。
这是一个非常简单的 PeekStream
允许您仅在流开头查看一定数量的字节(而不是随时查看)的实现。所查看的字节作为 Stream
自己,以尽量减少对现有代码的更改。
使用方法如下:
Stream nonSeekableStream = ...;
PeekStream peekStream = new PeekStream(nonSeekableStream, 30); // Peek max 30 bytes
Stream initialBytesStream = peekStream.GetInitialBytesStream();
ParseHeaders(initialBytesStream); // Work on initial bytes of nonSeekableStream
peekStream.Read(...) // Read normally, the read will start from the beginning
GetInitialBytesStream()
返回一个可查找流,其中最多包含 peekSize
底层流的初始字节(如果流短于则更少 peekSize
).
由于其简单性,读取 PeekStream 应该只比直接读取底层流稍微慢一点(如果有的话)。
public class PeekStream : Stream
{
private Stream m_stream;
private byte[] m_buffer;
private int m_start;
private int m_end;
public PeekStream(Stream stream, int peekSize)
{
if (stream == null)
{
throw new ArgumentNullException("stream");
}
if (!stream.CanRead)
{
throw new ArgumentException("Stream is not readable.");
}
if (peekSize < 0)
{
throw new ArgumentOutOfRangeException("peekSize");
}
m_stream = stream;
m_buffer = new byte[peekSize];
m_end = stream.Read(m_buffer, 0, peekSize);
}
public override bool CanRead
{
get
{
return true;
}
}
public override bool CanWrite
{
get
{
return false;
}
}
public override bool CanSeek
{
get
{
return false;
}
}
public override long Length
{
get
{
throw new NotSupportedException();
}
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public MemoryStream GetInitialBytesStream()
{
return new MemoryStream(m_buffer, 0, m_end, false);
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
// Validate arguments
if (buffer == null)
{
throw new ArgumentNullException("buffer");
}
if (offset < 0)
{
throw new ArgumentOutOfRangeException("offset");
}
if (offset + count > buffer.Length)
{
throw new ArgumentOutOfRangeException("count");
}
int totalRead = 0;
// Read from buffer
if (m_start < m_end)
{
int toRead = Math.Min(m_end - m_start, count);
Array.Copy(m_buffer, m_start, buffer, offset, toRead);
m_start += toRead;
offset += toRead;
count -= toRead;
totalRead += toRead;
}
// Read from stream
if (count > 0)
{
totalRead += m_stream.Read(buffer, offset, count);
}
// Return total bytes read
return totalRead;
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
public override int ReadByte()
{
if (m_start < m_end)
{
return m_buffer[m_start++];
}
else
{
return m_stream.ReadByte();
}
}
public override void Flush()
{
m_stream.Flush();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
m_stream.Dispose();
}
base.Dispose(disposing);
}
}
免责声明:上面的 PeekStream 取自一个工作程序,但没有经过全面测试,因此可能包含错误。它对我有用,但您可能会发现一些失败的极端情况。