Вопрос

Inspired by ipython-notebook-proxy, and based on ipydra, and extending the latter to support more complex user authentication as well as a proxy, because in my use case, only port 80 can be exposed.

I am using flask-sockets for the gunicorn worker, but I am having troubles to proxy WebSockets. IPython uses three different WebSockets connections, /shell, /stdin, and /iopub, but I am only able to get the 101 Switching Protocols for the first two. And /stdin receives a Connection Close Frame as soon as is created.

This is the excerpt code in question:

# Flask imports...
from werkzeug import LocalProxy
from ws4py.client.geventclient import WebSocketClient

# I use my own LocalProxy because flask-sockets does not support Werkzeug Rules
websocket = LocalProxy(lambda: request.environ.get('wsgi.websocket', None))
websockets = {}

PROXY_DOMAIN = "127.0.0.1:8888"  # IPython host and port
methods = ["GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH",
           "CONNECT"]


@app.route('/', defaults={'url': ''}, methods=methods)
@app.route('/<path:url>', methods=methods)
def proxy(url):
    with app.test_request_context():
        if websocket:
            while True:
                data = websocket.receive()
                websocket_url = 'ws://{}/{}'.format(PROXY_DOMAIN, url)
                if websocket_url not in websockets:
                    client = WebSocketClient(websocket_url,
                                             protocols=['http-only', 'chat'])
                    websockets[websocket_url] = client
                else:
                    client = websockets[websocket_url]
                client.connect()
                if data:
                    client.send(data)
                client_data = client.receive()
                if client_data:
                    websocket.send(client_data)
            return Response()

I also tried to create my own WebSocket proxy class, but it doesn't work either.

class WebSocketProxy(WebSocketClient):
    def __init__(self, to, *args, **kwargs):
        self.to = to
        print(("Proxy to", self.to))
        super(WebSocketProxy, self).__init__(*args, **kwargs)

    def opened(self):
        m = self.to.receive()
        print("<= %d %s" % (len(m), str(m)))
        self.send(m)

    def closed(self, code, reason):
        print(("Closed down", code, reason))

    def received_message(self, m):
        print("=> %d %s" % (len(m), str(m)))
        self.to.send(m)

Regular request-response cycle works like a charm, so I removed that code. If interested, the complete code is hosted in hidra.

I run the server with

$ gunicorn -k flask_sockets.worker hidra:app
Это было полезно?

Решение

Here is my solution(ish). It is crude, but should serve as a starting point for building websocket proxy. The full code is available in unreleased project, pyramid_notebook.

  • This uses ws4py and uWSGI instead of gunicorn

  • We use uWSGI's internal mechanism to receive downstream websocket message loop. There is nothing like WSGI for websockets in Python world (yet?), but looks like every web server implements its own mechanism.

  • A custom ws4py ProxyConnection is created which can combine ws4py event loop with uWSGI event loop

  • The thing is started and messages start fly around

  • This uses Pyramid request (based on WebOb), but this really shouldn't matter and code should be fine for any Python WSGI app with little modifications

  • As you can see, this does not really take advantage of asynchronicity, but just sleep() if there is nothing coming in from the socket

Code goes here:

"""UWSGI websocket proxy."""
from urllib.parse import urlparse, urlunparse
import logging
import time

import uwsgi
from ws4py import WS_VERSION
from ws4py.client import WebSocketBaseClient


#: HTTP headers we need to proxy to upstream websocket server when the Connect: upgrade is performed
CAPTURE_CONNECT_HEADERS = ["sec-websocket-extensions", "sec-websocket-key", "origin"]


logger = logging.getLogger(__name__)


class ProxyClient(WebSocketBaseClient):
    """Proxy between upstream WebSocket server and downstream UWSGI."""

    @property
    def handshake_headers(self):
        """
        List of headers appropriate for the upgrade
        handshake.
        """
        headers = [
            ('Host', self.host),
            ('Connection', 'Upgrade'),
            ('Upgrade', 'websocket'),
            ('Sec-WebSocket-Key', self.key.decode('utf-8')),
            # Origin is proxyed from the downstream server, don't set it twice
            # ('Origin', self.url),
            ('Sec-WebSocket-Version', str(max(WS_VERSION)))
            ]

        if self.protocols:
            headers.append(('Sec-WebSocket-Protocol', ','.join(self.protocols)))

        if self.extra_headers:
            headers.extend(self.extra_headers)

        logger.info("Handshake headers: %s", headers)
        return headers

    def received_message(self, m):
        """Push upstream messages to downstream."""

        # TODO: No support for binary messages
        m = str(m)
        logger.debug("Incoming upstream WS: %s", m)
        uwsgi.websocket_send(m)
        logger.debug("Send ok")

    def handshake_ok(self):
        """
        Called when the upgrade handshake has completed
        successfully.

        Starts the client's thread.
        """
        self.run()

    def terminate(self):
        raise RuntimeError("NO!")
        super(ProxyClient, self).terminate()

    def run(self):
        """Combine async uwsgi message loop with ws4py message loop.

        TODO: This could do some serious optimizations and behave asynchronously correct instead of just sleep().
        """

        self.sock.setblocking(False)
        try:
            while not self.terminated:
                logger.debug("Doing nothing")
                time.sleep(0.050)

                logger.debug("Asking for downstream msg")
                msg = uwsgi.websocket_recv_nb()
                if msg:
                    logger.debug("Incoming downstream WS: %s", msg)
                    self.send(msg)

                s = self.stream

                self.opened()

                logger.debug("Asking for upstream msg")
                try:
                    bytes = self.sock.recv(self.reading_buffer_size)
                    if bytes:
                        self.process(bytes)
                except BlockingIOError:
                    pass

        except Exception as e:
            logger.exception(e)
        finally:
            logger.info("Terminating WS proxy loop")
            self.terminate()


def serve_websocket(request, port):
    """Start UWSGI websocket loop and proxy."""
    env = request.environ

    # Send HTTP response 101 Switch Protocol downstream
    uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', ''))

    # Map the websocket URL to the upstream localhost:4000x Notebook instance
    parts = urlparse(request.url)
    parts = parts._replace(scheme="ws", netloc="localhost:{}".format(port))
    url = urlunparse(parts)

    # Proxy initial connection headers
    headers = [(header, value) for header, value in request.headers.items() if header.lower() in CAPTURE_CONNECT_HEADERS]

    logger.info("Connecting to upstream websockets: %s, headers: %s", url, headers)

    ws = ProxyClient(url, headers=headers)
    ws.connect()

    # Happens only if exceptions fly around
    return ""
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top