Pergunta

Eu estou correndo em um problema onde a leitura de um HttpResponseStream falha porque o StreamReader que eu estou enrolando em torno de lê mais rápido que o fluxo de resposta recebe a resposta real. Eu estou recuperando um arquivo de tamanho razoavelmente pequeno (cerca de 60k), mas o Analisador que processa a resposta em um objeto real falha porque ela atinge um caráter inesperado (código 65535), que a partir da experiência eu sei que é o personagem produzido quando você lê a partir de um StreamReader e não há mais caracteres disponíveis.

Para o registro eu sei que o ser conteúdo retornado é válida e irá analisar corretamente desde o fracasso ocorre em diferentes pontos no arquivo cada vez que eu executar o código. É a linha parser.Load () na seguinte onde ele falhar.

Existe uma maneira para garantir que eu tenha lido todo o conteúdo antes de tentar analisá-lo curto de copiar o fluxo de resposta em um MemoryStream ou string e depois processá-lo?

    /// <summary>
    /// Makes a Query where the expected Result is an RDF Graph ie. CONSTRUCT and DESCRIBE Queries
    /// </summary>
    /// <param name="sparqlQuery">SPARQL Query String</param>
    /// <returns>RDF Graph</returns>
    public Graph QueryWithResultGraph(String sparqlQuery)
    {
        try
        {
            //Build the Query URI
            StringBuilder queryUri = new StringBuilder();
            queryUri.Append(this._endpoint.ToString());
            queryUri.Append("?query=");
            queryUri.Append(Uri.EscapeDataString(sparqlQuery));

            if (!this._defaultGraphUri.Equals(String.Empty))
            {
                queryUri.Append("&default-graph-uri=");
                queryUri.Append(Uri.EscapeUriString(this._defaultGraphUri));
            }

            //Make the Query via HTTP
            HttpWebResponse httpResponse = this.DoQuery(new Uri(queryUri.ToString()),false);

            //Set up an Empty Graph ready
            Graph g = new Graph();
            g.BaseURI = this._endpoint;

            //Parse into a Graph based on Content Type
            String ctype = httpResponse.ContentType;
            IRDFReader parser = MIMETypesHelper.GetParser(ctype);
            parser.Load(g, new StreamReader(httpResponse.GetResponseStream()));

            return g;
        }
        catch (UriFormatException uriEx)
        {
            //URI Format Invalid
            throw new Exception("The format of the URI was invalid", uriEx);
        }
        catch (WebException webEx)
        {
            //Some sort of HTTP Error occurred
            throw new Exception("A HTTP Error occurred", webEx);
        }
        catch (RDFException)
        {
            //Some problem with the RDF or Parsing thereof
            throw;
        }
        catch (Exception)
        {
            //Other Exception
            throw;
        }
    }

    /// <summary>
    /// Internal Helper Method which executes the HTTP Requests against the SPARQL Endpoint
    /// </summary>
    /// <param name="target">URI to make Request to</param>
    /// <param name="sparqlOnly">Indicates if only SPARQL Result Sets should be accepted</param>
    /// <returns>HTTP Response</returns>
    private HttpWebResponse DoQuery(Uri target, bool sparqlOnly)
    {
        //Expect errors in this function to be handled by the calling function

        //Set-up the Request
        HttpWebRequest httpRequest;
        HttpWebResponse httpResponse;
        httpRequest = (HttpWebRequest)WebRequest.Create(target);

        //Use HTTP GET/POST according to user set preference
        if (!sparqlOnly)
        {
            httpRequest.Accept = MIMETypesHelper.HTTPAcceptHeader();
            //For the time being drop the application/json as this doesn't play nice with Virtuoso
            httpRequest.Accept = httpRequest.Accept.Replace("," + MIMETypesHelper.JSON[0], String.Empty);
        }
        else
        {
            httpRequest.Accept = MIMETypesHelper.HTTPSPARQLAcceptHeader();
        }
        httpRequest.Method = this._httpMode;
        httpRequest.Timeout = this._timeout;

        //HTTP Debugging
        if (Options.HTTPDebugging)
        {
            Tools.HTTPDebugRequest(httpRequest);
        }

        httpResponse = (HttpWebResponse)httpRequest.GetResponse();

        //HTTP Debugging
        if (Options.HTTPDebugging)
        {
            Tools.HTTPDebugResponse(httpResponse);
        }

        return httpResponse;
    }

Editar

Para esclarecer o que já foi dito esta é não um erro no analisador, esta é uma questão do StreamReader lendo mais rápido do que o fluxo de resposta fornece os dados. I pode contornar este problema fazendo o seguinte, mas gostaria de sugestões de soluções de melhor ou mais elegante:

            //Parse into a Graph based on Content Type
            String ctype = httpResponse.ContentType;
            IRDFReader parser = MIMETypesHelper.GetParser(ctype);
            Stream response = httpResponse.GetResponseStream();
            MemoryStream temp = new MemoryStream();
            Tools.StreamCopy(response, temp);
            response.Close();
            temp.Seek(0, SeekOrigin.Begin);
            parser.Load(g, new StreamReader(temp));

Editar 2

classe BlockingStreamReader como por sugestão de Eamon:

/// <summary>
/// A wrapper to a Stream which does all its Read() and Peek() calls using ReadBlock() to handle slow underlying streams (eg Network Streams)
/// </summary>
public sealed class BlockingStreamReader : StreamReader
{
    private bool _peeked = false;
    private int _peekChar = -1;

    public BlockingStreamReader(StreamReader reader) : base(reader.BaseStream) { }

    public BlockingStreamReader(Stream stream) : base(stream) { }

    public override int Read()
    {
        if (this._peeked)
        {
            this._peeked = false;
            return this._peekChar;
        }
        else
        {
            if (this.EndOfStream) return -1;

            char[] cs = new char[1];
            base.ReadBlock(cs, 0, 1);

            return cs[0];
        }
    }

    public override int Peek()
    {
        if (this._peeked)
        {
            return this._peekChar;
        }
        else
        {
            if (this.EndOfStream) return -1;

            this._peeked = true;

            char[] cs = new char[1];
            base.ReadBlock(cs, 0, 1);

            this._peekChar = cs[0];
            return this._peekChar;
        }
    }

    public new bool EndOfStream
    {
        get
        {
            return (base.EndOfStream && !this._peeked);
        }
    }
}

Editar 3

Aqui é uma solução muito melhor que pode envolver qualquer TextReader e fornecer uma propriedade EndOfStream. Ele usa um tampão interno, que é preenchido usando ReadBlock() no TextReader envolvido. Todos os métodos do leitor Read () pode ser definido o utilizando este tampão, o tamanho do buffer é configurável:

    /// <summary>
/// The BlockingTextReader is an implementation of a <see cref="TextReader">TextReader</see> designed to wrap other readers which may or may not have high latency.
/// </summary>
/// <remarks>
/// <para>
/// This is designed to avoid premature detection of end of input when the input has high latency and the consumer tries to read from the input faster than it can return data.  All methods are defined by using an internal buffer which is filled using the <see cref="TextReader.ReadBlock">ReadBlock()</see> method of the underlying <see cref="TextReader">TextReader</see>
/// </para>
/// </remarks>
public sealed class BlockingTextReader : TextReader
{
    private char[] _buffer;
    private int _pos = -1;
    private int _bufferAmount = -1;
    private bool _finished = false;
    private TextReader _reader;

    public const int DefaultBufferSize = 1024;

    public BlockingTextReader(TextReader reader, int bufferSize)
    {
        if (reader == null) throw new ArgumentNullException("reader", "Cannot read from a null TextReader");
        if (bufferSize < 1) throw new ArgumentException("bufferSize must be >= 1", "bufferSize");
        this._reader = reader;
        this._buffer = new char[bufferSize];
    }

    public BlockingTextReader(TextReader reader)
        : this(reader, DefaultBufferSize) { }

    public BlockingTextReader(Stream input, int bufferSize)
        : this(new StreamReader(input), bufferSize) { }

    public BlockingTextReader(Stream input)
        : this(new StreamReader(input)) { }

    private void FillBuffer()
    {
        this._pos = -1;
        if (this._finished)
        {
            this._bufferAmount = 0;
        }
        else
        {
            this._bufferAmount = this._reader.ReadBlock(this._buffer, 0, this._buffer.Length);
            if (this._bufferAmount == 0 || this._bufferAmount < this._buffer.Length) this._finished = true;
        }
    }

    public override int ReadBlock(char[] buffer, int index, int count)
    {
        if (count == 0) return 0;
        if (buffer == null) throw new ArgumentNullException("buffer");
        if (index < 0) throw new ArgumentException("index", "Index must be >= 0");
        if (count < 0) throw new ArgumentException("count", "Count must be >= 0");
        if ((buffer.Length - index) < count) throw new ArgumentException("Buffer too small");

        if (this._bufferAmount == -1 || this._pos >= this._bufferAmount)
        {
            if (!this._finished)
            {
                this.FillBuffer();
                if (this.EndOfStream) return 0;
            }
            else
            {
                return 0;
            }
        }

        this._pos = Math.Max(0, this._pos);
        if (count <= this._bufferAmount - this._pos)
        {
            //If we have sufficient things buffered to fufill the request just copy the relevant stuff across
            Array.Copy(this._buffer, this._pos, buffer, index, count);
            this._pos += count;
            return count;
        }
        else
        {
            int copied = 0;
            while (copied < count)
            {
                int available = this._bufferAmount - this._pos;
                if (count < copied + available)
                {
                    //We can finish fufilling this request this round
                    int toCopy = Math.Min(available, count - copied);
                    Array.Copy(this._buffer, this._pos, buffer, index + copied, toCopy);
                    copied += toCopy;
                    this._pos += toCopy;
                    return copied;
                }
                else
                {
                    //Copy everything we currently have available
                    Array.Copy(this._buffer, this._pos, buffer, index + copied, available);
                    copied += available;
                    this._pos = this._bufferAmount;

                    if (!this._finished)
                    {
                        //If we haven't reached the end of the input refill our buffer and continue
                        this.FillBuffer();
                        if (this.EndOfStream) return copied;
                        this._pos = 0;
                    }
                    else
                    {
                        //Otherwise we have reached the end of the input so just return what we've managed to copy
                        return copied;
                    }
                }
            }
            return copied;
        }
    }

    public override int Read(char[] buffer, int index, int count)
    {
        return this.ReadBlock(buffer, index, count);
    }

    public override int Read()
    {
        if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
        {
            if (!this._finished)
            {
                this.FillBuffer();
                if (this.EndOfStream) return -1;
            }
            else
            {
                return -1;
            }
        }

        this._pos++;
        return (int)this._buffer[this._pos];
    }

    public override int Peek()
    {
        if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
        {
            if (!this._finished)
            {
                this.FillBuffer();
                if (this.EndOfStream) return -1;
            }
            else
            {
                return -1;
            }
        }

        return (int)this._buffer[this._pos + 1];
    }

    public bool EndOfStream
    {
        get
        {
            return this._finished && (this._pos >= this._bufferAmount - 1);
        }
    }

    public override void Close()
    {
        this._reader.Close();
    }

    protected override void Dispose(bool disposing)
    {
        this.Close();
        this._reader.Dispose();
        base.Dispose(disposing);
    }
}
Foi útil?

Solução

Sem saber as especificidades do analisador que você está usando, só posso adivinhar o bug, mas há um bastante fácil de fazer bug o .NET framework I / O libs quase encorajá-lo a fazer ...

Você está ciente do fato de que Córregos e TextReaders pode ler menos bytes / caracteres do que o solicitado?

Em particular, TextReader.Read (char [] Buffer, índice int, int count) 's documentos dizem:

Valor de retorno

Tipo: System .. ::. Int32

O número de caracteres que foram lidos. O número será menos de ou igual a contar, dependendo se os dados estão disponíveis dentro do fluxo . Este método retorna zero se chamado quando há mais personagens são deixados para ler.

grifo meu.

Por exemplo, se você chamar reader.Read (buffer, 0, 100) você não pode assumir que 100 caracteres foram lidos.

Editar: É muito provável que o analisador assume esta; e isso explica o seu comportamento observado : se você armazenar em cache totalmente o fluxo em um MemoryStream, sempre haverá caracteres suficientes para fullfill o pedido - mas se você não fizer isso, o analisador receberá menos caracteres que solicitado em momentos imprevisíveis quando o fluxo subjacente é "lento".

Edit2:. Você pode corrigir seu erro, substituindo todas as instâncias de TextReader.Read () no analisador com TextReader.ReadBlock ()

Outras dicas

Para suportar um cenário de leitura de bloqueio, em vez de subclassificação StreamReader, você pode subclasse TextReader: Isso evita problemas com EndOfStream, e isso significa que você pode fazer qualquer leitor bloqueando - não apenas StreamReaders:

public sealed class BlockingReader : TextReader
{
    bool hasPeeked;
    int peekChar;
    readonly TextReader reader;

    public BlockingReader(TextReader reader) { this.reader = reader; }

    public override int Read()
    {
        if (!hasPeeked)
            return reader.Read();
        hasPeeked = false;
        return peekChar;
    }

    public override int Peek()
    {
        if (!hasPeeked)
        {
            peekChar = reader.Read();
            hasPeeked = true;
        }
        return peekChar;
    }

    public override int Read(char[] buffer, int index, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException("buffer");
        if (index < 0)
            throw new ArgumentOutOfRangeException("index");
        if (count < 0)
            throw new ArgumentOutOfRangeException("count");
        if ((buffer.Length - index) < count)
            throw new ArgumentException("Buffer too small");

        int peekCharsRead = 0;
        if (hasPeeked)
        {
            buffer[index] = (char)peekChar;
            hasPeeked = false;
            index++;
            count--;
            peekCharsRead++;
        }

        return peekCharsRead + reader.ReadBlock(buffer, index, count);
    }

    protected override void Dispose(bool disposing)
    {
        try
        {
            if (disposing)
                reader.Dispose();
        }
        finally
        {
            base.Dispose(disposing);
        }
    }
}
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top