質問

While using:

def myFunction(arg):
    for i in range(10000):
        pass

from multiprocessing import Pool
pool = Pool(processes=3)
pool.map_async( myFunction, ['first','second','third'] )

I want the user to be able to pause an execution of the multiprocessing's Pool at any given time after the Pool was started. Then I would like the user to be able to unpause (to continue) with the rest of the items in a Pool. How to achieve it?

EDIT:

Here is the working implementation of suggestions posted by Blckknght. Thanks Blckknght!

import multiprocessing
from PyQt4 import QtGui, QtCore


def setup(event):
    global unpaused
    unpaused = event

def myFunction( arg=None):
    unpaused.wait()
    print "Task started...", arg
    for i in range(15000000):
        pass
    print '...task completed.', arg


class MyApp(object):

    def __init__(self):
        super(MyApp, self).__init__()

        app = QtGui.QApplication(sys.argv)
        self.mainWidget = QtGui.QWidget()
        self.mainLayout = QtGui.QVBoxLayout()
        self.mainWidget.setLayout(self.mainLayout)

        self.groupbox = QtGui.QGroupBox()
        self.layout = QtGui.QVBoxLayout()
        self.groupbox.setLayout(self.layout)

        self.pauseButton = QtGui.QPushButton('Pause')
        self.pauseButton.clicked.connect(self.pauseButtonClicked)      
        self.layout.addWidget(self.pauseButton) 

        self.okButton = QtGui.QPushButton('Start Pool')
        self.okButton.clicked.connect(self.startPool) 
        self.layout.addWidget(self.okButton)
        self.layout.addWidget(self.pauseButton)      

        self.mainLayout.addWidget(self.groupbox)
        self.mainWidget.show()
        sys.exit(app.exec_())


    def startPool(self):
        self.event = multiprocessing.Event()
        self.pool=multiprocessing.Pool(1, setup, (self.event,))
        self.result=self.pool.map_async(myFunction, [1,2,3,4,5,6,7,8,9,10])
        self.event.set()
        # self.result.wait()       

    def pauseJob(self):
        self.event.clear()

    def continueJob(self):
        self.event.set()

    def pauseButtonClicked(self):
        if self.pauseButton.text()=='Pause':
            print '\n\t\t ...pausing job...','\n'
            self.pauseButton.setText('Resume')
            self.pauseJob()
        else:
            print '\n\t\t ...resuming job...','\n'
            self.pauseButton.setText('Pause')            
            self.continueJob()

if __name__ == '__main__':
    MyApp()
役に立ちましたか?

解決

It sounds like you want to use a multiprocessing.Event to control the running of your worker function. You can create one, then pass it to an initializer of the pool, then wait on it in myFunction.

Here's an example that runs workers that print their argument every second. The workers can be paused by clearing the event, and restarted by setting it again.

from time import sleep
import multiprocessing

def setup(event):
    global unpaused
    unpaused = event

def myFunction(arg):
    for i in range(10):
        unpaused.wait()
        print(arg)
        sleep(1)

if __name__ == "__main__":
    event = multiprocessing.Event() # initially unset, so workers will be paused at first
    pool = multiprocessing.Pool(3, setup, (event,))
    result = pool.map_async(myFunction, ["foo", "bar", "baz"])
    event.set()   # unpause workers
    sleep(5)
    event.clear() # pause after five seconds
    sleep(5)
    event.set()   # unpause again after five more seconds
    result.wait() # wait for the rest of the work to be completed

The worker processes should print "foo", "bar" and "baz" ten times each, with a one second delay between each repetition. The workers will be paused after the first five seconds though, and restarted after a five more seconds. There are probably various ways to improve this code, depending on what your actual use case is, but hopefully it is enough to get you headed in the right direction.

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top