Question

I'm trying to stream Server-Sent Events from my Pyramid application, but I can't figure out how to stream the response body from my view. Here's the test view I'm using (it totally doesn't implement SSE, it's just to work out the streaming portion):

@view_config(route_name='iter_test')
def iter_test(request):
    import time
    def test_iter():
        i = 0
        while True:
            i += 1
            if i == 5:
                raise StopIteration
            yield str(time.time())
            print time.time()
            time.sleep(1)

    return test_iter()

This produces ValueError: Could not convert return value of the view callable function pdiff.views.iter_test into a response object. The value returned was <generator object test_iter at 0x3dc19b0>.

I've tried return Response(app_iter=test_iter()) instead, which at least doesn't error out, but it doesn't stream the response - it waits until the generator has completed before returning the response to my browser.

I recognize that could simply return a single event per request and allow the clients to reconnect after each event, but I'd prefer to preserve the realtime nature of Server-Sent Events by streaming multiple events from a single request, without the reconnection delay. How can I do this with Pyramid?

Was it helpful?

Solution

I've found the issue. Turns out my application code is fine, and the problem lies with Waitress and nginx:

  1. Waitress, the default web server Pyramid uses, buffers all output in 18000-byte chunks (see this issue for details).

  2. The source of the problem was hidden from me by nginx, the web server I put in front of my Pyramid application, which also buffers responses.

(1) can be solved by either:

  • Configuring waitress with send_bytes = 1 in your .ini file. This fixes the streaming problem, but makes your entire app super slow. As @Zitrax mentioned, you can recover some speed with higher values, but any value higher than 1 risks messages getting stuck in the buffer.

  • Switching to gunicorn. I don't know whether gunicorn just uses a smaller buffer, or if it behaves better with app_iter, but it worked, and kept my app fast.

(2) can be solved by configuring nginx to disable buffering for your stream routes.

You need to set proxy_buffering off in your nginx conf. This setting applies to sites hosted via proxy_pass. If you're not using proxy_pass you may need a different setting.

  • You may configure nginx to dynamically enable/disable buffering for each response based on request headers, as shown in this question on the topic (a good solution for EventSource/Server-Sent Events)

  • You may alternatively configure this in a location block in your nginx conf. This is good if you're using something besides EventSource and you're not expecting to receive a particular header, or if you are using EventSource, but want to debug the response from a plain browser tab, where you can't send the Accept header in your request.

OTHER TIPS

I made some tests a while ago, to try Event Source / Server Sent Events. I just tested and it still works fine with Pyramid 1.5a.

@view_config(route_name = 'events')
def events(request):
    headers = [('Content-Type', 'text/event-stream'),
               ('Cache-Control', 'no-cache')]
    response = Response(headerlist=headers)
    response.app_iter = message_generator()
    return response

def message_generator():
    socket2 = context.socket(zmq.SUB)
    socket2.connect(SOCK)
    socket2.setsockopt(zmq.SUBSCRIBE, '')
    while True:
        msg = socket2.recv()
        yield "data: %s\n\n" % json.dumps({'message': msg})

Full example here: https://github.com/antoineleclair/zmq-sse-chat. Have a look at https://github.com/antoineleclair/zmq-sse-chat/blob/master/sse/views.py.

I'm not sure exactly why mine works and not yours. Maybe it's the headers. Or the two '\n' after each message. By the way, if you look at the event source spec correctly, you have to prefix each new event by data: and use \n\n as event separator.

If you don't specify any renderer for your view, you have to return a Response object. Pyramid Response object has a special argument app_iter for returning iterators. So you should do that in this way:

import time
from pyramid.response import Response


@view_config(route_name='iter_test')
def iter_test(request):

    def test_iter():
        for _ in range(5):
            yield str(time.time())
            print time.time()
            time.sleep(1)

    return Response(app_iter=test_iter())

I also edited your code a little to be more readable.

UPDATE

I've tried return Response(app_iter=test_iter()) instead, which at least doesn't error out, but it doesn't stream the response - it waits until the generator has completed before returning the response to my browser.

I guess the problem is in buffering. Try to send a really big iterator.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top