Question

I'm attempting to use a combination of cherrypy + multiprocessing (to launch worker 'processes') + gevent (to launch parallel i/o greenlets from within the worker 'processes'). It seems the easiest way of doing this is to monkeypatch multiprocessing, as greenlets can only operate in the main application process.

However, it looks like the monkey patching works for some parts of multiprocessing and not others. Here is my sample CherryPy server:

from gevent import monkey
monkey.patch_all()

import gevent
import cherrypy
import multiprocessing

def launch_testfuncs():
    jobs = [gevent.spawn(testfunc)
            for i in range(0, 12)]

    gevent.joinall(jobs, timeout=10)

def testfunc():
    print 'testing'

class HelloWorld(object):
    def index(self):
        launch_testfuncs()

        return "Hello World!"
    index.exposed = True

    def index_proc(self):
        proc = multiprocessing.Process(target=launch_testfuncs)
        proc.start()
        proc.join()

        return "Hello World 2!"
    index_proc.exposed = True

    def index_pool(self):
        pool = multiprocessing.Pool(1)
        return "Hello World 3!"
    index_pool.exposed = True

    def index_namespace(self):
        manager = multiprocessing.Manager()
        anamespace = manager.Namespace()
        anamespace.val = 23
        return "Hello World 4!"
    index_namespace.exposed = True


cherrypy.quickstart(HelloWorld())

The following works after monkey patching:

  • index - just spawning greenlets from within the cherrypy class directly
  • index_proc - use multiprocessing.Process to launch a new process, then spawn the greenlets from that process

The following have issues:

  • index_pool - launch a multiprocessing.Pool - hangs and never returns
  • index_namespace - initialize a multiprocessing.Manager namespace to manage shared memory within a pool/collection of workers - returns following error message:

    [15/Nov/2012:17:19:31] HTTP Traceback (most recent call last):
      File "/Library/Python/2.7/site-packages/cherrypy/_cprequest.py", line 656, in respond
    response.body = self.handler()
      File "/Library/Python/2.7/site-packages/cherrypy/lib/encoding.py", line 188, in __call__
    self.body = self.oldhandler(*args, **kwargs)
      File "/Library/Python/2.7/site-packages/cherrypy/_cpdispatch.py", line 34, in __call__
    return self.callable(*self.args, **self.kwargs)
      File "server.py", line 39, in index_namespace
    anamespace = manager.Namespace()
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp
    token, exp = self._create(typeid, *args, **kwds)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create
    conn = self._Client(self._address, authkey=self._authkey)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client
    answer_challenge(c, authkey)
      File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 414, in answer_challenge
    response = connection.recv_bytes(256)        # reject large message
    IOError: [Errno 35] Resource temporarily unavailable
    

I tried finding some documentation relating to this in the gevent docs, but couldn't find anything that deals with this. Is it just that gevent's monkey patching is incomplete? Has anyone else had similar issues and is there a way around it?

Was it helpful?

Solution

The problem seems to be a result of gevent.socket being non-blocking, meaning that any socket.recv_bytes(X) call will throw that error if X bytes are not immediately available on the socket. Specifically, the gevent.socket is designed to not block the socket, ever.

The problem with multiprocessing arises because it uses the stdlib socket module and expects it to be blocking, while after you've monkey.patch_all()'d, the socket module has been replaced, and multiprocessing.connection is not designed to deal with the new asynchronous behaviour.

You can tell monkey not to patch the socket, but it means that anything that was leveraging asynchronous sockets in your application may incur some loss of performance due to this.

To do this call patch_all with socket=False: patch_all(socket=False).

This is not an ideal solution, as you pretty much lose much of the benefit you would have gained from using gevent in the first place.

OTHER TIPS

I have met the same issue like yours. My solution was that I have just used the multiprocessing.Process() to spawn the fixed number processes. and finally join all, wait for completed.

#!/usr/bin/env python
# encoding: utf-8

from gevent import monkey
monkey.patch_all()

import gevent
import multiprocessing as mp


NUM = 10


def work(i):
    jobs = [gevent.spawn(func, i)
            for i in range(0, 12)]
    gevent.joinall(jobs)
    print "{} Done {}".format(mp.current_process().name, i)


def func(x):
    print "Gevent: {}".format(x)

def main():

    processes = [mp.Process(name="Process-{}".format(i), target=work, args=(i,)) for i in xrange(NUM)]

    for process in processes:
        process.start()

    for process in processes:
        process.join()


if __name__ == '__main__':
    main()

output

Gevent: 0
Gevent: 1
Gevent: 2
Gevent: 3
Gevent: 4
Gevent: 5
Gevent: 6
Gevent: 7
Gevent: 8
Gevent: 9
Gevent: 10
Gevent: 11
Process-0 Done 11
Gevent: 0
Gevent: 1
Gevent: 2
Gevent: 3
Gevent: 4
Gevent: 5
Gevent: 6
Gevent: 7
Gevent: 8
Gevent: 9
Gevent: 10
Gevent: 11
Process-1 Done 11
Gevent: 0
Gevent: 1
Gevent: 2
Gevent: 3
Gevent: 4
Gevent: 5
Gevent: 6
Gevent: 7
Gevent: 8
Gevent: 9
Gevent: 10
Gevent: 11
Process-2 Done 11
Gevent: 0
... ...

So that is a solution for gevent working with multiprocessing.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top