문제

I'm attempting to change the tasks available on a python-gearman worker during its work cycle. My reason for doing this is to allow me a little bit of control over my worker processes and allowing them to reload from a database. I need every worker to reload at regular intervals, but I don't want to simply kill the processes, and I want the service to be constantly available which means that I have to reload in batches. So I would have 4 workers reloading while another 4 workers are available to process, and then reload the next 4 workers.

Process:

  1. Start reload process 4 times.
    1. unregister the reload process
    2. reload the dataset
    3. register a finishReload task
    4. return
  2. Repeat step 1 until there are no workers with the reload task registered.
  3. Start finishReload(1) task until there are no workers with the finishReload task available.

(1) the finishReload task unregisters the finishReload task and registers the reload task and then returns.

Now, the problem that I'm running into is that the job fails when I change the tasks that are available to the worker process. There are no error messages or exceptions, just an "ERROR" in the gearmand log. Here's a quick program that replicates the problem.

WORKER

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

CLIENT

import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

Please let me know if there are anything things that I can elucidate.

EDIT: I know that someone will ask to see the log I mentioned. I've posted this question to the gearman group on Google as well, and log is available there.

도움이 되었습니까?

해결책

It looks like subclassing the GearmanWorker class and adding a few flags can work around this issue. I need to allow the job to complete before I start issuing new commands from the worker to the server, which seems to interrupt the current job. So if we overwrite the on_job_complete function we can check for the enable/disable flag and act on those after we issue the send_job_complete command. The new worker program follows:

WORKER

import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 

다른 팁

At a quick glance, the problem would seem to be that you are starting a job, then de-registering the workers ability to do that job from the job server before its finished.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top