我运行到那里从HttpResponseStream阅读的问题失败,因为我包裹在周围的StreamReader读取速度更快的响应流中获得的实际响应。我获取一个合理的小尺寸文件(大约60K),而是因为它击中了意外字符(代码65535),从经验,我知道,当你从一个读为字符产生其处理响应为实际的对象解析器失败StreamReader的和没有可用的进一步的字符。

有关我知道返回的内容是有效的,并且由于故障的不同点在文件中的每个I运行该代码时发生将正确地分析记录。这是在失败以下的parser.Load()行。

有没有一种方法,以确保我尝试分析它短的响应流复制到MemoryStream或字符串,然后处理它的前读过的所有内容?

    /// <summary>
    /// Makes a Query where the expected Result is an RDF Graph ie. CONSTRUCT and DESCRIBE Queries
    /// </summary>
    /// <param name="sparqlQuery">SPARQL Query String</param>
    /// <returns>RDF Graph</returns>
    public Graph QueryWithResultGraph(String sparqlQuery)
    {
        try
        {
            //Build the Query URI
            StringBuilder queryUri = new StringBuilder();
            queryUri.Append(this._endpoint.ToString());
            queryUri.Append("?query=");
            queryUri.Append(Uri.EscapeDataString(sparqlQuery));

            if (!this._defaultGraphUri.Equals(String.Empty))
            {
                queryUri.Append("&default-graph-uri=");
                queryUri.Append(Uri.EscapeUriString(this._defaultGraphUri));
            }

            //Make the Query via HTTP
            HttpWebResponse httpResponse = this.DoQuery(new Uri(queryUri.ToString()),false);

            //Set up an Empty Graph ready
            Graph g = new Graph();
            g.BaseURI = this._endpoint;

            //Parse into a Graph based on Content Type
            String ctype = httpResponse.ContentType;
            IRDFReader parser = MIMETypesHelper.GetParser(ctype);
            parser.Load(g, new StreamReader(httpResponse.GetResponseStream()));

            return g;
        }
        catch (UriFormatException uriEx)
        {
            //URI Format Invalid
            throw new Exception("The format of the URI was invalid", uriEx);
        }
        catch (WebException webEx)
        {
            //Some sort of HTTP Error occurred
            throw new Exception("A HTTP Error occurred", webEx);
        }
        catch (RDFException)
        {
            //Some problem with the RDF or Parsing thereof
            throw;
        }
        catch (Exception)
        {
            //Other Exception
            throw;
        }
    }

    /// <summary>
    /// Internal Helper Method which executes the HTTP Requests against the SPARQL Endpoint
    /// </summary>
    /// <param name="target">URI to make Request to</param>
    /// <param name="sparqlOnly">Indicates if only SPARQL Result Sets should be accepted</param>
    /// <returns>HTTP Response</returns>
    private HttpWebResponse DoQuery(Uri target, bool sparqlOnly)
    {
        //Expect errors in this function to be handled by the calling function

        //Set-up the Request
        HttpWebRequest httpRequest;
        HttpWebResponse httpResponse;
        httpRequest = (HttpWebRequest)WebRequest.Create(target);

        //Use HTTP GET/POST according to user set preference
        if (!sparqlOnly)
        {
            httpRequest.Accept = MIMETypesHelper.HTTPAcceptHeader();
            //For the time being drop the application/json as this doesn't play nice with Virtuoso
            httpRequest.Accept = httpRequest.Accept.Replace("," + MIMETypesHelper.JSON[0], String.Empty);
        }
        else
        {
            httpRequest.Accept = MIMETypesHelper.HTTPSPARQLAcceptHeader();
        }
        httpRequest.Method = this._httpMode;
        httpRequest.Timeout = this._timeout;

        //HTTP Debugging
        if (Options.HTTPDebugging)
        {
            Tools.HTTPDebugRequest(httpRequest);
        }

        httpResponse = (HttpWebResponse)httpRequest.GetResponse();

        //HTTP Debugging
        if (Options.HTTPDebugging)
        {
            Tools.HTTPDebugResponse(httpResponse);
        }

        return httpResponse;
    }

修改

要澄清我已经说过这是的的解析器中的一个错误,这是StreamReader的读取速度比响应流中提供数据的问题。我可以解决这个问题通过执行以下操作,但想更好或更优雅的解决方案的建议:

            //Parse into a Graph based on Content Type
            String ctype = httpResponse.ContentType;
            IRDFReader parser = MIMETypesHelper.GetParser(ctype);
            Stream response = httpResponse.GetResponseStream();
            MemoryStream temp = new MemoryStream();
            Tools.StreamCopy(response, temp);
            response.Close();
            temp.Seek(0, SeekOrigin.Begin);
            parser.Load(g, new StreamReader(temp));

修改2

BlockingStreamReader类作为每埃蒙的建议:

/// <summary>
/// A wrapper to a Stream which does all its Read() and Peek() calls using ReadBlock() to handle slow underlying streams (eg Network Streams)
/// </summary>
public sealed class BlockingStreamReader : StreamReader
{
    private bool _peeked = false;
    private int _peekChar = -1;

    public BlockingStreamReader(StreamReader reader) : base(reader.BaseStream) { }

    public BlockingStreamReader(Stream stream) : base(stream) { }

    public override int Read()
    {
        if (this._peeked)
        {
            this._peeked = false;
            return this._peekChar;
        }
        else
        {
            if (this.EndOfStream) return -1;

            char[] cs = new char[1];
            base.ReadBlock(cs, 0, 1);

            return cs[0];
        }
    }

    public override int Peek()
    {
        if (this._peeked)
        {
            return this._peekChar;
        }
        else
        {
            if (this.EndOfStream) return -1;

            this._peeked = true;

            char[] cs = new char[1];
            base.ReadBlock(cs, 0, 1);

            this._peekChar = cs[0];
            return this._peekChar;
        }
    }

    public new bool EndOfStream
    {
        get
        {
            return (base.EndOfStream && !this._peeked);
        }
    }
}

修改3

下面是可以包装任何TextReader并提供EndOfStream性的显着改善的解决方案。它使用它是通过使用在包裹ReadBlock() TextReader填充的内部缓冲器。所有的读()可以在使用该缓冲器来定义读者的方法,缓冲区大小是可配置的:

    /// <summary>
/// The BlockingTextReader is an implementation of a <see cref="TextReader">TextReader</see> designed to wrap other readers which may or may not have high latency.
/// </summary>
/// <remarks>
/// <para>
/// This is designed to avoid premature detection of end of input when the input has high latency and the consumer tries to read from the input faster than it can return data.  All methods are defined by using an internal buffer which is filled using the <see cref="TextReader.ReadBlock">ReadBlock()</see> method of the underlying <see cref="TextReader">TextReader</see>
/// </para>
/// </remarks>
public sealed class BlockingTextReader : TextReader
{
    private char[] _buffer;
    private int _pos = -1;
    private int _bufferAmount = -1;
    private bool _finished = false;
    private TextReader _reader;

    public const int DefaultBufferSize = 1024;

    public BlockingTextReader(TextReader reader, int bufferSize)
    {
        if (reader == null) throw new ArgumentNullException("reader", "Cannot read from a null TextReader");
        if (bufferSize < 1) throw new ArgumentException("bufferSize must be >= 1", "bufferSize");
        this._reader = reader;
        this._buffer = new char[bufferSize];
    }

    public BlockingTextReader(TextReader reader)
        : this(reader, DefaultBufferSize) { }

    public BlockingTextReader(Stream input, int bufferSize)
        : this(new StreamReader(input), bufferSize) { }

    public BlockingTextReader(Stream input)
        : this(new StreamReader(input)) { }

    private void FillBuffer()
    {
        this._pos = -1;
        if (this._finished)
        {
            this._bufferAmount = 0;
        }
        else
        {
            this._bufferAmount = this._reader.ReadBlock(this._buffer, 0, this._buffer.Length);
            if (this._bufferAmount == 0 || this._bufferAmount < this._buffer.Length) this._finished = true;
        }
    }

    public override int ReadBlock(char[] buffer, int index, int count)
    {
        if (count == 0) return 0;
        if (buffer == null) throw new ArgumentNullException("buffer");
        if (index < 0) throw new ArgumentException("index", "Index must be >= 0");
        if (count < 0) throw new ArgumentException("count", "Count must be >= 0");
        if ((buffer.Length - index) < count) throw new ArgumentException("Buffer too small");

        if (this._bufferAmount == -1 || this._pos >= this._bufferAmount)
        {
            if (!this._finished)
            {
                this.FillBuffer();
                if (this.EndOfStream) return 0;
            }
            else
            {
                return 0;
            }
        }

        this._pos = Math.Max(0, this._pos);
        if (count <= this._bufferAmount - this._pos)
        {
            //If we have sufficient things buffered to fufill the request just copy the relevant stuff across
            Array.Copy(this._buffer, this._pos, buffer, index, count);
            this._pos += count;
            return count;
        }
        else
        {
            int copied = 0;
            while (copied < count)
            {
                int available = this._bufferAmount - this._pos;
                if (count < copied + available)
                {
                    //We can finish fufilling this request this round
                    int toCopy = Math.Min(available, count - copied);
                    Array.Copy(this._buffer, this._pos, buffer, index + copied, toCopy);
                    copied += toCopy;
                    this._pos += toCopy;
                    return copied;
                }
                else
                {
                    //Copy everything we currently have available
                    Array.Copy(this._buffer, this._pos, buffer, index + copied, available);
                    copied += available;
                    this._pos = this._bufferAmount;

                    if (!this._finished)
                    {
                        //If we haven't reached the end of the input refill our buffer and continue
                        this.FillBuffer();
                        if (this.EndOfStream) return copied;
                        this._pos = 0;
                    }
                    else
                    {
                        //Otherwise we have reached the end of the input so just return what we've managed to copy
                        return copied;
                    }
                }
            }
            return copied;
        }
    }

    public override int Read(char[] buffer, int index, int count)
    {
        return this.ReadBlock(buffer, index, count);
    }

    public override int Read()
    {
        if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
        {
            if (!this._finished)
            {
                this.FillBuffer();
                if (this.EndOfStream) return -1;
            }
            else
            {
                return -1;
            }
        }

        this._pos++;
        return (int)this._buffer[this._pos];
    }

    public override int Peek()
    {
        if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
        {
            if (!this._finished)
            {
                this.FillBuffer();
                if (this.EndOfStream) return -1;
            }
            else
            {
                return -1;
            }
        }

        return (int)this._buffer[this._pos + 1];
    }

    public bool EndOfStream
    {
        get
        {
            return this._finished && (this._pos >= this._bufferAmount - 1);
        }
    }

    public override void Close()
    {
        this._reader.Close();
    }

    protected override void Dispose(bool disposing)
    {
        this.Close();
        this._reader.Dispose();
        base.Dispose(disposing);
    }
}
有帮助吗?

解决方案

不知道具体情况,你正在使用的解析器,我只能猜测错误,但有一个很容易做出错误的.NET框架的I / O库几乎鼓励你......

您知道的事实,俱乐部和TextReaders可以读取的字节数更少/字符比请求?

在特别地,TextReader.Read(CHAR []缓冲液,INT指数,诠释计数)的文档说:

  

<强>返回值

     

类型:System .. ::的Int32

     

这已被读出的字符数。数量将是<强> 小于 或等于计数,这取决于是否是可用的流内的数据。此方法返回零,如果当没有更多的字符被留读取调用。

重点矿山。

例如,如果调用reader.Read(缓冲液,0,100)您不能假设100个字符被读取。

编辑:的这是非常有可能的是,解析器不会承担这一点;和这说明你观察到的行为的:如果你完全一个MemoryStream缓存流,总是会有足够的字符来fullfill请求 - 但是如果你不这样做,分析器将获得比要求的少字在不可预测的时间每当底层流是“慢”。

EDIT2:的您可以通过在解析器与TextReader.ReadBlock()代替TextReader.Read()的所有实例解决您的错误

其他提示

要支持阻塞读的情况,而不是继承StreamReader,你也可以继承TextReader:这避免了与EndOfStream问题,这意味着你可以做的任何的阅读器拦截 - 不只是StreamReaders:

public sealed class BlockingReader : TextReader
{
    bool hasPeeked;
    int peekChar;
    readonly TextReader reader;

    public BlockingReader(TextReader reader) { this.reader = reader; }

    public override int Read()
    {
        if (!hasPeeked)
            return reader.Read();
        hasPeeked = false;
        return peekChar;
    }

    public override int Peek()
    {
        if (!hasPeeked)
        {
            peekChar = reader.Read();
            hasPeeked = true;
        }
        return peekChar;
    }

    public override int Read(char[] buffer, int index, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException("buffer");
        if (index < 0)
            throw new ArgumentOutOfRangeException("index");
        if (count < 0)
            throw new ArgumentOutOfRangeException("count");
        if ((buffer.Length - index) < count)
            throw new ArgumentException("Buffer too small");

        int peekCharsRead = 0;
        if (hasPeeked)
        {
            buffer[index] = (char)peekChar;
            hasPeeked = false;
            index++;
            count--;
            peekCharsRead++;
        }

        return peekCharsRead + reader.ReadBlock(buffer, index, count);
    }

    protected override void Dispose(bool disposing)
    {
        try
        {
            if (disposing)
                reader.Dispose();
        }
        finally
        {
            base.Dispose(disposing);
        }
    }
}
许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top