Question

I have an application that uses both grequests and multiprocessing.managers for a combination of IPC communication and Asynchronous RESTful communications over HTTP.

It seems that grequests, in using gevent.monkey's patch_all() method, breaks the multiprocessing.connection module used by the multiprocessing.manager.SyncManager class and its derivatives.

This is apparently not an isolated issue, but affects any use case that implements multiprocessing.connetion, such as multiprocessing.pool, for example.

Drilling down into the code in gevent/monkey.py, I found that the swapping of the stdlib socket module with gevent.socket is what causes the breakage. This can be found at line 115 in gevent/monkey.py under the patch_socket() function:

def patch_socket(dns=True, aggressive=True):
    """Replace the standard socket object with gevent's cooperative sockets.
    ...
    _socket.socket = socket.socket # This line breaks multiprocessing.connection!
    ...

My question is then why does this swappage break multiprocessing.connection, and what advantages are derived from using gevent.socket instead of the stdlib's socket module? That is, what performance loss, if any, will I incur from not patching the socket module?

Traceback

Traceback (most recent call last):
  File "clientWithGeventMonkeyPatch.py", line 49, in <module>
    client = GetClient(host, port, authkey)
  File "clientWithGeventMonkeyPatch.py", line 39, in GetClient
    client.connect()
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 500, in connect
    conn = Client(self._address, authkey=self._authkey)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 175, in Client
    answer_challenge(c, authkey)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 414, in answer_challenge
    response = connection.recv_bytes(256)        # reject large message
IOError: [Errno 11] Resource temporarily unavailable

code to reproduce the error

(on ubuntu server 11.10, python2.7.3, with gevent, greenlet, and grequests installed)

manager.py

## manager.py
import multiprocessing
import multiprocessing.managers
import datetime


class LocalManager(multiprocessing.managers.SyncManager):
    def __init__(self, *args, **kwargs):
        multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
        self.__type__ = 'LocalManager'

def GetManager(host, port, authkey):
    def getdatetime():
        return '{}'.format(datetime.datetime.now())

    LocalManager.register('getdatetime', callable = getdatetime)
    manager = LocalManager(address = (host, port), authkey = authkey)
    manager.start()

    return manager

if __name__ == '__main__':
    # define our manager connection parameters
    port = 55555
    host = 'localhost'
    authkey = 'auth1234'

    # start a manager
    man = GetManager(host, port, authkey)

    # wait for user input to shut down
    raw_input('return to shutdown')
    man.shutdown()

client.py

## client.py -- this one works
import time
import multiprocessing.managers

class RemoteClient(multiprocessing.managers.SyncManager):
    def __init__(self, *args, **kwargs):
        multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
        self.__type__ = 'RemoteClient'

def GetClient(host, port, authkey):
    RemoteClient.register('getdatetime')
    client = RemoteClient(address = (host, port), authkey = authkey)
    client.connect()
    return client

if __name__ == '__main__':
    # define our client connection parameters
    port = 55555
    host = 'localhost'
    authkey = 'auth1234'

    # start a manager
    client = GetClient(host, port, authkey)
    print 'connected', client
    print 'client.getdatetime()', client.getdatetime()
    # wait a couple of seconds, then do it again
    time.sleep(2)
    print 'client.getdatetime()', client.getdatetime()

    # exit...

clientWithGeventMonkeyPatch.py

## clientWithGeventMonkeyPatch.py -- breaks, depending on patch_all() parameters        
import time
import multiprocessing.managers


# this part is copied from grequests
# bear in mind that it doesn't actually do anything in this module.
try:
    import gevent
    from gevent import monkey as curious_george
    from gevent.pool import Pool
except ImportError:
    raise RuntimeError('Gevent is required for grequests.')

# this line causes breakage of the multiprocessing.manager connection auth method:
# Monkey-patch. 
# patch_all() parameters with default values:  socket=True, dns=True, time=True, select=True, thread=True, os=True, ssl=True, aggressive=True

curious_george.patch_all(thread=False, select=False) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = False) # works!
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = True, dns = True) # same as (thread=False, select=False); breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = True, dns = False) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = False, dns = True) # breaks
#~ curious_george.patch_all(thread=False, select=False, socket = True, aggressive = False, dns = False) # breaks







class RemoteClient(multiprocessing.managers.SyncManager):
    def __init__(self, *args, **kwargs):
        multiprocessing.managers.SyncManager.__init__(self, *args, **kwargs)
        self.__type__ = 'RemoteClient'

def GetClient(host, port, authkey):
    RemoteClient.register('getdatetime')
    client = RemoteClient(address = (host, port), authkey = authkey)
    client.connect()
    return client

if __name__ == '__main__':
    # define our client connection parameters
    port = 55555
    host = 'localhost'
    authkey = 'auth1234'

    # start a manager
    client = GetClient(host, port, authkey)
    print 'connected', client
    print 'client.getdatetime()', client.getdatetime()
    # wait a couple of seconds, then do it again
    time.sleep(2)
    print 'client.getdatetime()', client.getdatetime()

    # exit...
Était-ce utile?

La solution

If you don't patch the socket module, gevent's ability to not block on network operations won't be available, and thus most of the benefit of using gevent in the first place won't be available.

gevent and multiprocessing aren't really designed to play nicely with one another - gevent mostly assumes that you're doing your network connections through it, and not bypassing the highest level Python socket interfaces (which multiprocessing does).

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top