Question

Is there any .NET data structure/combination of classes that allows for byte data to be appended to the end of a buffer but all peeks and reads are from the start, shortening the buffer when I read?

The MemoryStream class seems to do part of this, but I need to maintain separate locations for reading and writing, and it doesn't automatically discard the data at the start after it's read.

An answer has been posted in reply to this question which is basically what I'm trying to do but I'd prefer something I can do asynchronous I/O on in different components of the same process, just like a normal pipe or even a network stream (I need to filter/process the data first).

Was it helpful?

Solution

I'll post a stripped out copy of some logic i wrote for a project at work once. The advantage of this version is that it works with a linked list of buffered data and therefore you dont have to cache huge amounts of memory and/or copy memory around when reading. furthermore, its thread safe and behaves like a network stream, that is: When reading when there is no data available: Wait untill there is data available or timeout. Also, when reading x amounts of bytes and there are only y amounts of bytes, return after reading all bytes. I hope this helps!

    public class SlidingStream : Stream
{
    #region Other stream member implementations

    ...

    #endregion Other stream member implementations

    public SlidingStream()
    {
        ReadTimeout = -1;
    }

    private readonly object _writeSyncRoot = new object();
    private readonly object _readSyncRoot = new object();
    private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>();
    private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim();

    public int ReadTimeout { get; set; }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (_dataAvailableResetEvent.Wait(ReadTimeout))
            throw new TimeoutException("No data available");

        lock (_readSyncRoot)
        {
            int currentCount = 0;
            int currentOffset = 0;

            while (currentCount != count)
            {
                ArraySegment<byte> segment = _pendingSegments.First.Value;
                _pendingSegments.RemoveFirst();

                int index = segment.Offset;
                for (; index < segment.Count; index++)
                {
                    if (currentOffset < offset)
                    {
                        currentOffset++;
                    }
                    else
                    {
                        buffer[currentCount] = segment.Array[index];
                        currentCount++;
                    }
                }

                if (currentCount == count)
                {
                    if (index < segment.Offset + segment.Count)
                    {
                        _pendingSegments.AddFirst(new ArraySegment<byte>(segment.Array, index, segment.Offset + segment.Count - index));
                    }
                }

                if (_pendingSegments.Count == 0)
                {
                    _dataAvailableResetEvent.Reset();

                    return currentCount;
                }
            }

            return currentCount;
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        lock (_writeSyncRoot)
        {
            byte[] copy = new byte[count];
            Array.Copy(buffer, offset, copy, 0, count);

            _pendingSegments.AddLast(new ArraySegment<byte>(copy));

            _dataAvailableResetEvent.Set();
        }   
    }
}

OTHER TIPS

The code can be simpler than in the accepted answer. There is no need to use a for loop.:

/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
    private List<Byte> mi_FifoData = new List<Byte>();

    /// <summary>
    /// Get the count of bytes in the Fifo buffer
    /// </summary>
    public int Count
    {
        get 
        { 
            lock (mi_FifoData)
            {
                return mi_FifoData.Count; 
            }
        }
    }

    /// <summary>
    /// Clears the Fifo buffer
    /// </summary>
    public void Clear()
    {
        lock (mi_FifoData)
        {
            mi_FifoData.Clear();
        }
    }

    /// <summary>
    /// Append data to the end of the fifo
    /// </summary>
    public void Push(Byte[] u8_Data)
    {
        lock (mi_FifoData)
        {
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            mi_FifoData.AddRange(u8_Data);
        }
    }

    /// <summary>
    /// Get data from the beginning of the fifo.
    /// returns null if s32_Count bytes are not yet available.
    /// </summary>
    public Byte[] Pop(int s32_Count)
    {
        lock (mi_FifoData)
        {
            if (mi_FifoData.Count < s32_Count)
                return null;

            // Internally the .NET framework uses Array.Copy() which is extremely fast
            Byte[] u8_PopData = new Byte[s32_Count];
            mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
            mi_FifoData.RemoveRange(0, s32_Count);
            return u8_PopData;
        }
    }

    /// <summary>
    /// Gets a byte without removing it from the Fifo buffer
    /// returns -1 if the index is invalid
    /// </summary>
    public int PeekAt(int s32_Index)
    {
        lock (mi_FifoData)
        {
            if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
                return -1;

            return mi_FifoData[s32_Index];
        }
    }
}

I tried polishing Polity's code. It's far from optimization, but maybe just works.

public class SlidingStream : Stream {
  public SlidingStream() {
    ReadTimeout = -1;
  }

  private readonly object ReadSync = new object();
  private readonly object WriteSync = new object();
  private readonly ConcurrentQueue<ArraySegment<byte>> PendingSegments
    = new ConcurrentQueue<ArraySegment<byte>>();
  private readonly ManualResetEventSlim DataAvailable = new ManualResetEventSlim(false);
  private ArraySegment<byte>? PartialSegment;

  public new int ReadTimeout;

  public override bool CanRead => true;

  public override bool CanSeek => false;

  public override bool CanWrite => true;

  public override long Length => throw new NotImplementedException();

  public override long Position {
    get => throw new NotImplementedException();
    set => throw new NotImplementedException();
  }

  private bool Closed;

  public override void Close() {
    Closed = true;
    DataAvailable.Set();
    base.Close();
  }

  public override int Read(byte[] buffer, int offset, int count) {
    int msStart = Environment.TickCount;

    lock (ReadSync) {
      int read = 0;

      while (read < count) {
        ArraySegment<byte>? seg = TryDequeue(msStart);
        if (seg == null) {
          return read;
        }

        ArraySegment<byte> segment = seg.GetValueOrDefault();
        int bite = Math.Min(count - read, segment.Count);
        if (bite < segment.Count) {
          PartialSegment = new ArraySegment<byte>(
            segment.Array,
            segment.Offset + bite,
            segment.Count - bite
          );
        }

        Array.Copy(segment.Array, segment.Offset, buffer, offset + read, bite);
        read += bite;
      }

      return read;
    }
  }

  private ArraySegment<byte>? TryDequeue(int msStart) {
    ArraySegment<byte>? ps = PartialSegment;
    if (ps.HasValue) {
      PartialSegment = null;
      return ps;
    }

    DataAvailable.Reset();

    ArraySegment<byte> segment;
    while (!PendingSegments.TryDequeue(out segment)) {
      if (Closed) {
        return null;
      }
      WaitDataOrTimeout(msStart);
    }

    return segment;
  }

  private void WaitDataOrTimeout(int msStart) {
    int timeout;
    if (ReadTimeout == -1) {
      timeout = -1;
    }
    else {
      timeout = msStart + ReadTimeout - Environment.TickCount;
    }

    if (!DataAvailable.Wait(timeout)) {
      throw new TimeoutException("No data available");
    }
  }

  public override void Write(byte[] buffer, int offset, int count) {
    lock (WriteSync) {
      byte[] copy = new byte[count];
      Array.Copy(buffer, offset, copy, 0, count);

      PendingSegments.Enqueue(new ArraySegment<byte>(copy));

      DataAvailable.Set();
    }
  }

  public override void Flush() {
    throw new NotImplementedException();
  }

  public override long Seek(long offset, SeekOrigin origin) {
    throw new NotImplementedException();
  }

  public override void SetLength(long value) {
    throw new NotImplementedException();
  }
}

And here's a hopefully race condition free version using SemaphoreSlim to notify:

public class SlidingStream : Stream
{
    private readonly object _writeLock = new object();
    private readonly object _readLock = new object();
    private readonly ConcurrentQueue<byte[]> _pendingSegments = new ConcurrentQueue<byte[]>();
    private byte[] _extraSegment = null;

    private readonly SemaphoreSlim _smSegmentsAvailable = new SemaphoreSlim(0);

    public override void Write(byte[] buffer, int offset, int count)
    {
        lock (_writeLock)
        {
            var copy = new byte[count];
            Array.Copy(buffer, offset, copy, 0, count);

            _pendingSegments.Enqueue(copy);
            _smSegmentsAvailable.Release(1);
        }
    }

    public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancel)
    {
        Write(buffer, offset, count);
        return Task.CompletedTask;
    }

    public override int Read(byte[] buffer, int offset, int bytesToRead)
    {
        lock (_readLock)
        {
            var bytesRead = 0;

            while (bytesToRead > 0)
            {
                byte[] segment;

                if (_extraSegment != null)
                {
                    segment = _extraSegment;
                    _extraSegment = null;
                }
                else
                {
                    if (_smSegmentsAvailable.CurrentCount == 0 && bytesRead > 0)
                    {
                        return bytesRead;
                    }

                    _smSegmentsAvailable.Wait(_cancel);

                    if (!_pendingSegments.TryDequeue(out segment))
                    {
                        throw new InvalidOperationException("No segment found, despite semaphore");
                    }
                }

                var copyCount = Math.Min(bytesToRead, segment.Length);
                Array.Copy(segment, 0, buffer, offset + bytesRead, copyCount);
                bytesToRead -= copyCount;
                bytesRead += copyCount;

                var extraCount = segment.Length - copyCount;
                if (extraCount > 0)
                {
                    _extraSegment = new byte[extraCount];
                    Array.Copy(segment, copyCount, _extraSegment, 0, extraCount);
                }
            }

            return bytesRead;
        }
    }

    public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        //could be extended here with a proper async read
        var result = Read(buffer, offset, count);
        return Task.FromResult(result);
    }

    protected override void Dispose(bool disposing)
    {
        _smSegmentsAvailable.Dispose();
        base.Dispose(disposing);
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => true;

    public override long Seek(long offset, SeekOrigin origin)
        => throw new NotSupportedException();

    public override void SetLength(long value)
        => throw new NotSupportedException();

    public override void Flush() {}

    public override long Length => throw new NotSupportedException();

    public override long Position
    {
        get => throw new NotSupportedException();
        set => throw new NotSupportedException();
    }
}

A late answer but there is a Queue support in .NET Framework since version 2.0. Use ConcurrentQueue for thread safe operations.

I created the below implementation of a Stream which will Read and ReadLines when the bytes become available. Not the best implementation but it should do the job.

public class QueueStream : Stream
{
    protected readonly ConcurrentQueue<byte> Queue = new ConcurrentQueue<byte>();

    public Task? DownloadTask { get; set; }

    public override bool CanRead => true;

    public override bool CanSeek => false;

    public override bool CanWrite => true;

    public override long Length => Queue.Count;

    public override long Position
    {
        get => 0;
        set => throw new NotImplementedException($"{nameof(QueueStream)} is not seekable");
    }

    public override void Flush()
    {
        Queue.Clear();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
        {
            throw new ArgumentNullException(nameof(buffer));
        }

        if (buffer.Length < count)
        {
            throw new Exception($"{nameof(buffer)} length is less that the specified {nameof(count)}");
        }

        var index = 0;
        var insertedCount = 0;
        while (Queue.TryDequeue(out var b) && insertedCount < count)
        {
            if (index >= offset)
            {
                buffer[insertedCount++] = b;
            }

            index++;
        }

        return insertedCount;
    }

    public string ReadLines(int numberOfLines = 1)
    {
        var currentLine = 0;
        var stringBuilder = new StringBuilder();

        Func<bool> isFaulted = () => false;
        Func<bool> isCompleted = () => true;

        if (DownloadTask != null)
        {
            isFaulted = () => DownloadTask.IsFaulted;
            isCompleted = () => DownloadTask.IsCompleted;
        }

        while (!isFaulted() && !isCompleted() && currentLine < numberOfLines)
        {
            if (!Queue.TryDequeue(out var byteResult))
            {
                continue;
            }

            if (byteResult == '\r' || byteResult == '\n')
            {
                if (byteResult == '\r')
                {
                    byte peekResult = 0;
                    while (!isFaulted() && !isCompleted() && !Queue.TryPeek(out peekResult))
                    {
                    }

                    if (peekResult == '\n')
                    {
                        Queue.TryDequeue(out _);
                    }
                }

                stringBuilder.Append(Environment.NewLine);
                currentLine++;
                continue;
            }

            stringBuilder.Append((char)byteResult);
        }

        return stringBuilder.ToString();
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        var forEnd = offset + count;
        for (var index = offset; index < forEnd; index++)
        {
            Queue.Enqueue(buffer[index]);
        }
    }
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top