Frage

I've written a HTTP-Server that produces endless HTTP streams consisting of JSON-structured events. Similar to Twitter's streaming API. These events are separated by \n (according to Server-sent events with Content-Type:text/event-stream) and can vary in length.

The response is

  • chunked (HTTP 1.1 Transfer-Encoding:chunked) due to the endless stream
  • compressed (Content-Encoding: gzip) to save bandwidth.

I want to consume these lines in Python as soon as they arrive and as resource-efficient as possible, without reinventing the wheel.

As I'm currently using python-requests, do you know how to make it work? If you think, python-requests cannot help here, I'm totally open for alternative frameworks/libraries.

My current implementation is based on requests and uses iter_lines(...) to receive the lines. But the chunk_size parameter is tricky. If set to 1 it is very cpu-intense, since some events can be several kilobytes. If set to any value above 1, some events got stuck until the next arrive and the whole buffer "got filled". And the time between events can be several seconds. I expected that the chunk_size is some sort of "maximum number of bytes to receive" as in unix's recv(...). The corresponding man-page says:

The receive calls normally return any data available, up to the requested amount, rather than waiting for receipt of the full amount requested.

But this is obviously not how it works in the requests-library. They use it more or less as an "exact number of bytes to receive". While looking at their source code, I couldn't identify which part is responsible for that. Maybe httplib's Response or ssl's SSLSocket.

As a workaround I tried padding my lines on the server to a multiple of the chunk-size. But the chunk-size in the requests-library is used to fetch bytes from the compressed response stream. So this won't work until I can pad my lines so that their compressed byte-sequence is a multiple of the chunk-size. But this seems far too hacky.

I've read that Twisted could be used for non-blocking, non-buffered processing of http streams on the client, but I only found code for creating stream responses on the server.

War es hilfreich?

Lösung

Thanks to Martijn Pieters answer I stopped working around python-requests behavior and looked for a completely different approach.

I ended up using pyCurl. You can use it similar to a select+recv loop without inverting the control flow and giving up control to a dedicated IO-loop as in Tornado, etc. This way it is easy to use a generator that yields new lines as soon as they arrive - without further buffering in intermediate layers that could introduce delay or additional threads that run the IO-loop.

At the same time, it is high-level enough, that you don't need to bother about chunked transfer encoding, SSL encryption or gzip compression.

This was my old code, where chunk_size=1 resulted in 45% CPU load and chunk_size>1 introduced additional lag.

import requests
class RequestsHTTPStream(object):
    def __init__(self, url):
        self.url = url

    def iter_lines(self):
        headers = {'Cache-Control':'no-cache',
                   'Accept': 'text/event-stream',
                   'Accept-Encoding': 'gzip'}
        response = requests.get(self.url, stream=True, headers=headers)
        return response.iter_lines(chunk_size=1)

Here is my new code based on pyCurl: (Unfortunately the curl_easy_* style perform blocks completely, which makes it difficult to yield lines in between without using threads. Thus I'm using the curl_multi_* methods)

import pycurl
import urllib2
import httplib
import StringIO

class CurlHTTPStream(object):
    def __init__(self, url):
        self.url = url
        self.received_buffer = StringIO.StringIO()

        self.curl = pycurl.Curl()
        self.curl.setopt(pycurl.URL, url)
        self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
        self.curl.setopt(pycurl.ENCODING, 'gzip')
        self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
        self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)

        self.curlmulti = pycurl.CurlMulti()
        self.curlmulti.add_handle(self.curl)

        self.status_code = 0

    SELECT_TIMEOUT = 10

    def _any_data_received(self):
        return self.received_buffer.tell() != 0

    def _get_received_data(self):
        result = self.received_buffer.getvalue()
        self.received_buffer.truncate(0)
        self.received_buffer.seek(0)
        return result

    def _check_status_code(self):
        if self.status_code == 0:
            self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
        if self.status_code != 0 and self.status_code != httplib.OK:
            raise urllib2.HTTPError(self.url, self.status_code, None, None, None)

    def _perform_on_curl(self):
        while True:
            ret, num_handles = self.curlmulti.perform()
            if ret != pycurl.E_CALL_MULTI_PERFORM:
                break
        return num_handles

    def _iter_chunks(self):
        while True:
            remaining = self._perform_on_curl()
            if self._any_data_received():
                self._check_status_code()
                yield self._get_received_data()
            if remaining == 0:
                break
            self.curlmulti.select(self.SELECT_TIMEOUT)

        self._check_status_code()
        self._check_curl_errors()

    def _check_curl_errors(self):
        for f in self.curlmulti.info_read()[2]:
            raise pycurl.error(*f[1:])

    def iter_lines(self):
        chunks = self._iter_chunks()
        return self._split_lines_from_chunks(chunks)

    @staticmethod
    def _split_lines_from_chunks(chunks):
        #same behaviour as requests' Response.iter_lines(...)

        pending = None
        for chunk in chunks:

            if pending is not None:
                chunk = pending + chunk
            lines = chunk.splitlines()

            if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
                pending = lines.pop()
            else:
                pending = None

            for line in lines:
                yield line

        if pending is not None:
            yield pending

This code tries to fetch as many bytes as possible from the incoming stream, without blocking unnecessarily if there are only a few. In comparison, the CPU load is around 0.2%

Andere Tipps

It is not requests' fault that your iter_lines() calls are blocking.

The Response.iter_lines() method calls Response.iter_content(), which calls urllib3's HTTPResponse.stream(), which calls HTTPResponse.read().

These calls pass along a chunk-size, which is what is passed on to the socket as self._fp.read(amt). This is the problematic call, as self._fp is a file object produced by socket.makefile() (as done by the httplib module); and this .read() call will block until amt (compressed) bytes are read.

This low-level socket file object does support a .readline() call that will work more efficiently, but urllib3 cannot make use of this call when handling compressed data; line terminators are not going to be visible in the compressed stream.

Unfortunately, urllib3 won't call self._fp.readline() when the response isn't compressed either; the way the calls are structured it'd be hard to pass along you want to read in line-buffering mode instead of in chunk-buffering mode as it is.

I must say that HTTP is not the best protocol to use for streaming events; I'd use a different protocol for this. Websockets spring to mind, or a custom protocol for your specific use-case.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top