Question

I'm trying to have a pool of worker processes in my Pyramid application that can be used to carry out CPU-intensive (or long-running) background tasks that I don't want to bog down the views with. What I have right now works, but there is one problem: If waitress exits from termination (like it happens with --reload) the workers keep lingering, and I don't know how to signal them to stop.

Edit: This doesn't seem to be an issue when using Gunicorn (or just running it from some file). Could this be a bug with Waitress?

Edit2: Well. Or Gunicorn just handles this differently making it look better.

import multiprocessing as mp
from queue import Empty as QueueEmptyError

class MyPool(object):

    def __init__(self, processes=10, queue=None):
        self.jobqueue = queue if queue is not None else mp.Queue()
        self.procs = []
        self.running = True
        for i in range(processes):
            worker = mp.Process(target=self._worker, daemon=True)
            self.procs.append(worker)
            worker.start()

    def __del__(self):
        self.stopall()

    def stopall(self):
        self.running = False
        for worker in self.procs:
            worker.join()

    def _worker(self):
        while self.running:
            try:
                self._dojob(self.jobqueue.get(True, 1))
            except QueueEmptyError:
                pass

    def _dojob(self, taskdata):
        print(str(taskdata) + ' is happening')

class PoolMaster(object):
    def __init__(self):
        self.pools = []
        self.aqueue = mp.Queue()
        self.apool = MyPool(6, self.aqueue)
        self.pools.append(self.apool)

    def __del__(self):
        for pool in self.pools:
            pool.stopall()

    def do_something(self):
        self.aqueue.put_nowait('Something')

PoolMaster is instantiated once in my project's main() function and exposed to all views by adding it to all events.

What I tried before was adding a "poison pill" to the queue when __del__ happens, but as it turns out __del__ doesn't seem to get called at all. I don't want to use multiprocessing's own Pool because they seem to be made for running through a set workload once, not constantly working on a queue. So, how do I stop them from running after the actual application has exited?

Was it helpful?

Solution 2

Alright, I found a way to make it happen by changing the _worker function:

def _worker(self):
    while True:
        try:
            self._dojob(self.jobqueue.get(True, 3))
        except QueueEmptyError:
            pass
        if os.getppid() == 1:
            print('Parent process died, stopping worker.')
            sys.exit(0)

When the parent process dies without ending the daemon _worker processes os.getppid() will return 1 instead of the parents ID on UNIX-like systems. Every 3 seconds it stops waiting for new objects in the queue and checks for that.

I don't feel like this is the proper way though.

OTHER TIPS

You can use a Pipe for each of your workers in your MyPool (say, cancelPipe). Then, your worker would periodically listen to the Pipe for some cancellation code (say, True). To use a pipe in your worker process, you would do something like this:

connections = mp.Pipe()
self.cancelPipes.append(connections[0])
worker = mp.Process(target=self._worker, daemon=True, args=[connections[1]])

Then, when you want to cancel everyone, do:

for c in self.cancelPipes:
    c.send(True)

The workers would check to see if they should stop with this:

gotSomething = cancelPipe.poll()
if gotSomething:
    cancel = cancelPipe.recv()
    if cancel:
        # Stop your worker
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top