Pergunta

I have a Thread that should wait for tasks to arrive from different multible Threads and execute them until no task is left. If no task is left it should wait again.

I tried it with this class (only relevant code):

from threading import Event, Thread

class TaskExecutor(object):
    def __init__(self):
        self.event = Event()
        self.taskInfos = []
        self._running = True

        task_thread = Thread(target=self._run_worker_thread)
        self._running = True
        task_thread.daemon = True
        task_thread.start()

    def _run_worker_thread(self):
        while self.is_running():
            if len(self.taskInfos) == 0:
                self.event.clear()
                self.event.wait()

            try:
                msg, task = self.taskInfos[0]
                del self.taskInfos[0]
                if task:
                    task.execute(msg)
            except Exception, e:
                logger.error("Error " + str(e))

    def schedule_task(self, msg, task):
        self.taskInfos.append((msg, task))
        self.event.set()

Multiple Threads are calling schedule_task everytime they like to add a task.

The problem is that I get an error sometimes saying: list index out of range from the msg, task = self.taskInfos[0] line. The del self.taskInfos[0] below is the only one where I delete a task.

How can that happen? I feel like I have to synchronize everything, but there is no such keyword in python, and reading the docs brought up this pattern.

Foi útil?

Solução

This code is pretty hopeless - give up on it and do something sane ;-) What's sane? Use a Queue.Queue. That's designed to do what you want.

Replace:

    self.event = Event()
    self.taskInfos = []

with:

    self.taskInfos = Queue.Queue()

(of course you have to import Queue too).

To add a task:

    self.taskInfos.put((msg, task))

To get a task:

    msg, task = self.taskInfos.get()

That will block until a task is available. There are also options to do a non-blocking .get() attempt, and to do a .get() attempt with a timeout (read the docs).

Trying to fix the code you have would be a nightmare. At heart, Events are not powerful enough to do what you need for thread safety in this context. In fact, any time you see code doing Event.clear(), it's probably buggy (subject to races).

Edit: what will go wrong next

If you continue trying to fix this code, this is likely to happen next:

the queue is empty
thread 1 does len(self.taskInfo) == 0, and loses its timeslice
thread 2 does self.taskInfos.append((msg, task))
         and does self.event.set()
         and loses its timeslice
thread 1 resumes and does self.event.clear()
         and does self.event.wait()

Oops! Now thread 1 waits forever, despite that a task is on the queue.

That's why Python supplies Queue.Queue. You're exceedingly unlikely to get a correct solution using a feeble Event.

Outras dicas

The following sequence is possible (assume Thread #0 is a consumer and runs your _run_worker_thread method, and threads Thread #1 and Thread #2 are producers and call schedule_task method):

  • Thread #0 waits on the event, queue is empty
  • Thread #1 calls schedule_task and is preempted before set
  • Thread #2 calls schedule_task and reaches set
  • Thread #0 awakens and does two iterations, clearing the task queue
  • Thread #1 awakens and reaches the set call
  • Thread #0 awakens with incorrect state - the queue is empty

The parts in bold are key to understand the race that is possible. Basically, the worker thread can consume all the tasks, spinning with if len(self.taskInfos) == 0 condition being False before all producers will manage to set the event after appending to the queue.

Possible solutions include checking the condition again after wait as suggested in the comments by xndrme, or using the Lock class, best one probably being the Queue.Queue class mentioned by Tim Peters in his answer.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top