I'm using the python multiprocessing module to run some long running tasks in parallel. I'm using the start() method to run the job, but once the jobs have returned I'd like to run them again.

Is it possible to reuse the Process I create? or do I have to create a new Process object every time I want to run the job?

There is this section from the pyhton docs suggesting I cannot use the start() method more than onece, but perhaps someone knows of another way to reuse the instance:

start()

Start the process’s activity.

This must be called at most once per process object. It arranges for the object’s run() method to be invoked in a separate process.

This is my version of the Process class:

class Process(multiprocessing.Process):
    def __init__(self, result_queue, MCMCinstance):
        assert isinstance(MCMCinstance, MCMC)
        multiprocessing.Process.__init__(self)
        self.result_queue = result_queue
        self.mcmc = MCMCinstance
        self.interface = C_interface(self.mcmc)
        self.burn_in = False

    def run(self):
        if self.burn_in: interface.burn_in()
        self.interface.sample(self.mcmc.options.runs)
        self.interface.update(self.mcmc)
        self.result_queue.put(self.mcmc)

Then I instantiate the Processes and run them using the start() method:

# setup the jobs and run
result_queue = multiprocessing.Queue()

mcmc1 = MCMC(options, donors, clusters)
mcmc2 = MCMC(options, donors, clusters)
mcmc3 = MCMC(options, donors, clusters)
mcmc4 = MCMC(options, donors, clusters)

p1 = Process(result_queue, mcmc1)
p2 = Process(result_queue, mcmc2)
p3 = Process(result_queue, mcmc3)
p4 = Process(result_queue, mcmc4)

jobs = [p1, p2, p3, p4]

for job in jobs:
    job.start()

results = [result_queue.get() for job in jobs]
有帮助吗?

解决方案 2

Like the documentation says, you can only call the .start() method once, I believe I have to create new processes each time:

# setup the jobs and run
result_queue = multiprocessing.Queue()

mcmc1 = MCMC(options, donors, clusters)
mcmc2 = MCMC(options, donors, clusters)
mcmc3 = MCMC(options, donors, clusters)
mcmc4 = MCMC(options, donors, clusters)

p1 = Process(result_queue, mcmc1)
p2 = Process(result_queue, mcmc2)
p3 = Process(result_queue, mcmc3)
p4 = Process(result_queue, mcmc4)

jobs = [p1, p2, p3, p4]

for job in jobs:
    #job.debug_level = 1
    job.start()

results = [result_queue.get() for job in jobs]

#for res in results: res.traceplot(show=False)
p5 = Process(result_queue, results[0])
p6 = Process(result_queue, results[1])
p7 = Process(result_queue, results[2])
p8 = Process(result_queue, results[3])

jobs2 = [p5, p6, p7, p8]

for j in jobs2:
    j.start()


results2 = [result_queue.get() for job in jobs2]

其他提示

To reuse process, you should use a Pool. Something like this should probably work, though I haven't test it yet.

SENTINEL = "SENTINEL"

class Worker(object):
    def __init__(self, result_queue, MCMCinstance):
        assert isinstance(MCMCinstance, MCMC)
        self.result_queue = result_queue
        self.mcmc = MCMCinstance
        self.interface = C_interface(self.mcmc)
        self.burn_in = False

    def run(self):
        if self.burn_in: interface.burn_in()
        self.interface.sample(self.mcmc.options.runs)
        self.interface.update(self.mcmc)
        #Signal exit by putting SENTINEL in the queue 
        if True:       
            self.result_queue.put(SENTINEL)
        else:
            self.result_queue.put(self.mcmc)

def run(result_queue):
    while True:
        instance = result_queue.get(True)
        if instance == SENTINEL:
            break
        worker = Worker(result_queue, instance)
        worker.run()

if __name__ == "__main__":
    result_queue = multiprocessing.Queue()
    pool = multiprocessing.pool.Pool(3, run, (result_queue,)) # Use a pool with 5 process

    mcmc1 = MCMC(options, donors, clusters)
    mcmc2 = MCMC(options, donors, clusters)
    mcmc3 = MCMC(options, donors, clusters)
    mcmc4 = MCMC(options, donors, clusters)

    result_queue.put(mcmc1)  
    result_queue.put(mcmc2)  
    result_queue.put(mcmc3)  
    result_queue.put(mcmc4)  

    pool.close()
    pool.join()

No, it is not possible. There is a specific guard against it in start().

I can only speculate why it is not reusable, but I think it's a design choice. It would probably add too much logic to class to recycle object, that it's worth. But I think its much more interesting, to ask why is it so.

Though, after reading the sources for past 20 minutes to get it I can say for sure, that just creating a fork of whole python process, takes much more time than creating a new instance of the object, so it doesn't matter anyways in terms of performance.

As for your code you could compress it a little bit, you don't have to used named Process instances, and take advantage of list comprehensions.

# setup the jobs and run
result_queue = multiprocessing.Queue()

mcmc_list = [MCMC(options, donors, clusters)]*4
jobs = [Process(result_queue, mcmc) for mcmc in mcmc_list ]
for job in jobs:
    #job.debug_level = 1
    job.start()

results = [result_queue.get() for job in jobs]

#for res in results: res.traceplot(show=False)
jobs2 = [Process(result_queue, result) for result in results]

for j in jobs2:
    j.start()

results2 = [result_queue.get() for job in jobs2]

EDIT: I also think you kind of misuse Queue it's for communicating between processes, you do not need that here, I think. To make a pool of threads, you should use Pool and Pool.map. However I am not able to give exact code example, with out seeing original target function. I think it would need to be adjusted.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top