Pregunta

I'm currently working to add support for gevent-socketio to an existing django project. I'm finding that gevent.monkey.patch_all() call is breaking the cancellation mechanism of a thread which is responsible for receiving data from a socket, we'll call the class SocketReadThread for now.

SocketReadThread is pretty simple, it calls recv() on a blocking socket. When it receives data is processes it and calls recv() again. The thread stops when an exception occurs or when recv() returns 0 bytes as occurs when socket.shutdown(SHUT_RDWR) is called in SocketReadThread.stop_reading()

The problem occurs when the gevent.monkey.patch_all() replaces the default socket implementation. Instead of shutting down nicely I get the following exception:

error: [Errno 9] File descriptor was closed in another greenlet

I'm assuming this is occurring because gevent makes my socket non-blocking in order to work its magic. This means that when I call socket.shutdown(socket.SHUT_RDWR) the greenlet that was doing the work for the monkey patched socket.recv call tried to read from the closed file descriptor.

I coded an example to isolate this issue:

from gevent import monkey

monkey.patch_all()

import socket
import sys
import threading
import time


class SocketReadThread(threading.Thread):
    def __init__(self, socket):
        super(SocketReadThread, self).__init__()
        self._socket = socket

    def run(self):
        connected = True
        while connected:
            try:
                print "calling socket.recv"
                data = self._socket.recv(1024)
                if (len(data) < 1):
                    print "received nothing, assuming socket shutdown"
                    connected = False
                else :
                    print "Recieved something: {}".format(data)
            except socket.timeout as e:
                print "Socket timeout: {}".format(e)
                connected = false
            except :
                ex = sys.exc_info()[1]
                print "Unexpected exception occurrred: {}".format(str(ex))
                raise ex

    def stop_reading(self):
        self._socket.shutdown(socket.SHUT_RDWR)
        self._socket.close()


if __name__ == '__main__':

    sock = socket.socket()
    sock.connect(('127.0.0.1', 4242))

    st = SocketReadThread(sock)
    st.start()
    time.sleep(3)
    st.stop_reading()
    st.join()

If you open a terminal an run nc -lp 4242 & (to give this program something to connect to) and then run this program you will see the exception mentioned above. If you remove the call to monkey.patch_all() you will see that it works just fine.

My question is: How can support cancellation of the SocketReadThread in a way that works with or without gevent monkey patching and doesn't require the use of an arbitrary timeout that would make cancellation slow (i.e. calling recv() with a timeout and checking a conditional)?

¿Fue útil?

Solución

I found that there were two different workarounds for this. The first was to simply catch and suppress the exception. This appears to work fine since it is common practice for one thread to close a socket in order to cause another thread to exit from a blocking read. I don't know or understand why greenlets would complain about this other than a debugging aid. It is really just an annoyance.

The second option was to use the self-pipe trick (a quick search yields many explanations) as a mechanism to wake up a blocked thread. Essentially we create a second file descriptor (a socket is like a type of file descriptor to the OS) for signaling cancellation. We then use select as our blocking to wait for either incoming data on the socket or a cancellation request to come in on the cancellation file descriptor. See the example code below.

from gevent import monkey

monkey.patch_all()

import os
import select
import socket
import sys
import threading
import time


class SocketReadThread(threading.Thread):
    def __init__(self, socket):
        super(SocketReadThread, self).__init__()
        self._socket = socket
        self._socket.setblocking(0)
        r, w = os.pipe()
        self._cancelpipe_r = os.fdopen(r, 'r')
        self._cancelpipe_w = os.fdopen(w, 'w')

    def run(self):
        connected = True
        read_fds = [self._socket, self._cancelpipe_r]
        while connected:
            print "Calling select"
            read_list, write_list, x_list = select.select(read_fds, [], [])
            print "Select returned"
            if self._cancelpipe_r in read_list :
                print "exiting"
                self._cleanup()
                connected = False
            elif self._socket in read_list:
                print "calling socket.recv"
                data = self._socket.recv(1024)
                if (len(data) < 1):
                    print "received nothing, assuming socket shutdown"
                    connected = False
                    self._cleanup()
                else :
                    print "Recieved something: {}".format(data)


    def stop_reading(self):
        print "writing to pipe"
        self._cancelpipe_w.write("\n")
        self._cancelpipe_w.flush()
        print "joining"
        self.join()
        print "joined"

    def _cleanup(self):
        self._cancelpipe_r.close()
        self._cancelpipe_w.close()
        self._socket.shutdown(socket.SHUT_RDWR)
        self._socket.close()


if __name__ == '__main__':

    sock = socket.socket()
    sock.connect(('127.0.0.1', 4242))

    st = SocketReadThread(sock)
    st.start()
    time.sleep(3)
    st.stop_reading()

Again, before running the above program run netcat -lp 4242 & to give it a listening socket to connect to.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top