Question

My routine below takes a list of urllib2.Requests and spawns a new process per request and fires them off. The purpose is for asynchronous speed, so it's all fire-and-forget (no response needed). The issue is that the processes spawned in the code below never terminate. So after a few of these the box wilL OOM. Context: Django web app. Any help?

MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)



def request_manager(req_list):
    try:
            # put request list in the queue
            for req in req_list:
                    MPQ.put(req)

                    # call processes on queue
                    worker = multiprocessing.Process(target=process_request, args=(MPQ,))
                    worker.daemon = True
                    worker.start()

            # move on after queue is empty
            MPQ.join()

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(MPQ):
    try:
            while True:
                    req = MPQ.get()
                    dr = urllib2.urlopen(req)
                    MPQ.task_done()

    except Exception, e:
            logging.error(traceback.print_exc())
Was it helpful?

Solution 4

Ok after some fiddling (and a good night's sleep) I believe I've figured out the problem (and thank you Eri, you were the inspiration I needed). The main issue of the zombie processes was that I was not signaling back that the process was finished (and killing it) both of which I (naively) thought was happening automagically with multiprocess.

The code that worked:

# function that will be run through the pool
def process_request(req):
    try:
            dr = urllib2.urlopen(req, timeout=30)

    except Exception, e:
            logging.error(traceback.print_exc())

# process killer
def sig_end(r):
    sys.exit()

# globals
MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
CHUNK_SIZE = 20
POOL = multiprocessing.Pool(MP_CONCURRENT)    

# pool initiator
def request_manager(req_list):
    try:
            resp = POOL.map_async(process_request, req_list, CHUNK_SIZE, callback=sig_end)

    except Exception, e:
            logging.error(traceback.print_exc())

A couple of notes:

1) The function that will be hit by "map_async" ("process_request" in this example) must be defined first (and before the global declarations).

2) There is probably a more graceful way to exit the process (suggestions welcome).

3) Using pool in this example really was best (thanks again Eri) due to the "callback" feature which allows me to throw a signal right away.

OTHER TIPS

Maybe i am not right, but

MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)



def request_manager(req_list):
    try:
            # put request list in the queue
            pool=[]
            for req in req_list:
                    MPQ.put(req)

                    # call processes on queue
                    worker = multiprocessing.Process(target=process_request, args=(MPQ,))
                    worker.daemon = True
                    worker.start()
                    pool.append(worker)

            # move on after queue is empty
            MPQ.join()
            # Close not needed processes
            for p in pool: p.terminate()

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(MPQ):
    try:
            while True:
                    req = MPQ.get()
                    dr = urllib2.urlopen(req)
                    MPQ.task_done()

    except Exception, e:
            logging.error(traceback.print_exc())
MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)
CHUNK_SIZE = 20 #number of requests sended to one process.
pool = multiprocessing.Pool(MP_CONCURRENT)

def request_manager(req_list):
    try:
            # put request list in the queue
            responce=pool.map(process_request,req_list,CHUNK_SIZE) # function exits after all requests called and pool work ended
    # OR
            responce=pool.map_async(process_request,req_list,CHUNK_SIZE) #function request_manager exits after all requests passed to pool

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(req):
    dr = urllib2.urlopen(req)

This works ~5-10x faster then your code

Integrate side "brocker" to django (such as rabbitmq or something like it).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top