Domanda

As the title implies I have a deadlock and no idea why. I have multiple producers and only one consumer. The schedule_task method will get called by multiple processes after the thread has called the get method of the queue

from logging import getLogger
from time import sleep
from threading import Event, Thread
from multiprocessing import Process
from Queue import Queue


class TaskExecutor(object):
    def __init__(self):
        print("init taskExecutor")
        self.event = Event()
        self.taskInfos = Queue()
        task_thread = Thread(target=self._run_worker_thread)
        self._instantEnd = False
        self._running = True
        task_thread.daemon = True
        task_thread.start()

    def _run_worker_thread(self):
        print("start running taskExcecutor worker Thread")
        while self.is_running():
            try:
                print("try to get queued task from %s" % str(self.taskInfos))
                msg, task = self.taskInfos.get()
                print("got task %s for msg: %s" % str(task), str(msg))
                task.execute(msg)
                self.taskInfos.task_done()
            except Exception, e:
                logger.error("Error: %s" % e.message)
        print("shutting down TaskExecutor!")

    def is_running(self):
        return True

    def schedule_task(self, msg, task):
        try:
            print("appending task '%s' for msg: %s" % (str(task), str(msg)))
            self.taskInfos.put((msg, task))
            print("into queue: %s " % str(self.taskInfos))
        except Exception, e:
            print("queue is probably full: %s" % str(e))


class Task(object):

    def execute(self, msg):
        print(msg)


executor = TaskExecutor()

def produce():
    cnt = 0
    while True:
        executor.schedule_task("Message " + str(cnt), Task())
        cnt += 1
        sleep(1)

for i in range(4):
    p = Process(target=produce)
    p.start()

From my logs I get:

init taskExecutor
start running taskExcecutor worker Thread
try to get queued task from <Queue.Queue instance at 0x7fdd09830cb0>
 appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd098f8f10>' for msg: Message 0
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f35d0>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3490>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 
appending task '<__main__.Task object at 0x7fdd086f3b10>' for msg: Message 1
into queue: <Queue.Queue instance at 0x7fdd09830cb0> 

Can someone please explain, what am I missing?

È stato utile?

Soluzione

While it's not possible for other people to run this code (it's not self-contained), there's no obvious problem in what you showed. So the problem is in something you haven't showed - perhaps in the code creating and using instances of TaskExecutor.

And when I plugged in missing pieces I made up out of thin air, this code worked fine.

So you need to show more than just this. How about replacing:

logger.debug("try to get queued task")

with

logger.debug("try to get queued task from queue %s", self.taskInfos)

? Then at least we could see whether your producers are using the same queue as your consumer.

Next

Thanks for adding that. Next up: here's a self-contained program for you to try. It's very much like your code. See whether it runs correctly for you (it does for me):

from threading import Thread, Lock
from Queue import Queue

class Logger:
     def __init__(self):
         self.iolock = Lock()

     def debug(self, str, *msg):
         with self.iolock:
             print str % msg

     error = debug

logger = Logger()

class TaskExecutor(object):
    def __init__(self):
        logger.debug("init taskExecutor")
        self.taskInfos = Queue()
        task_thread = Thread(target=self._run_worker_thread)
        task_thread.daemon = True
        task_thread.start()

    def is_running(self):
        return True

    def _run_worker_thread(self):
        logger.debug("start running taskExcecutor worker Thread")
        while self.is_running():
            try:
                logger.debug("try to get queued task from queue %s", self.taskInfos)
                msg, task = self.taskInfos.get()
                logger.debug("got task %s for msg: %s", str(task), str(msg))
                #task.execute(msg)
                self.taskInfos.task_done()
            except Exception, e:
                logger.error("Error: %s", e.message)
        logger.debug("shutting down TaskExecutor!")

    def schedule_task(self, msg, task):
        try:
            logger.debug("appending task '%s' for msg: %s", str(task), str(msg))
            self.taskInfos.put((msg, task))
            logger.debug("into queue: %s ", str(self.taskInfos))
        except Exception, e:
            logger.debug("queue is probably full: %s", str(e))

te = TaskExecutor()

def runit():
    for i in range(10):
        te.schedule_task("some task", i)

main = Thread(target=runit)
main.start()

Next

OK, this code can't possibly work. On a Linux-y system, exactly one instance of TaskExecutor is created, here:

executor = TaskExecutor()

That happens in the main program. Each time you do:

p = Process(target=produce)

your main program is fork()'ed. While the forked processes also see executor, that's an address-space copy of the main program's executor, and has nothing whatsoever to do with the executor in the main program (the usual copy-on-write fork() semantics).

Each child process also has a copy of executor's data members, including its Queue. All the child processes put data on their own unique copy of executor, but the consumer thread is running only in the main program, and nothing worker processes do to their copy of executor can have any effect on what the main program's consumer thread sees.

So this is really confused. I have to stop now to try to figure out what you might really want to be doing here ;-) If you want to play with ideas, investigate using a multiprocessing.Queue. The only way to communicate between processes is to use objects that are built from the ground up to support inter-process communication. Queue.Queue will never work for that.

And one more

Here's one that works fine across processes, and even on Windows ;-)

from time import sleep
from threading import Thread
from multiprocessing import Process, JoinableQueue

class TaskExecutor(Thread):
    def __init__(self):
        print("init taskExecutor")
        Thread.__init__(self)
        self.taskInfos = JoinableQueue()

    def getq(self):
        return self.taskInfos

    def run(self):
        print("start running taskExcecutor worker Thread")
        while self.is_running():
            try:
                print("try to get queued task from %s" % self.taskInfos)
                msg, task = self.taskInfos.get()
                print("got task %s for msg: %s" % (task, msg))
                task.execute(msg)
                self.taskInfos.task_done()
            except Exception, e:
                print("Error: %s" % e.message)
        print("shutting down TaskExecutor!")

    def is_running(self):
        return True

class Task(object):
    def execute(self, msg):
        print(msg)

def produce(q):
    cnt = 0
    while True:
        q.put(("Message " + str(cnt), Task()))
        cnt += 1
        sleep(1)

if __name__ == "__main__":
    executor = TaskExecutor()
    executor.start()
    for i in range(4):
        p = Process(target=produce, args=(executor.getq(),))
        p.start()

The if __name__ == "__main__" part doesn't just allow it to run on Windows, it has great "documentation" value too, making it obvious at a glance that executor indeed runs only in the main program.

A question for you is whether this is the division of labor you want, though. Do you really want the main process - and only the main process - to do all the

   task.execute(msg)

work? No way from here to guess whether that's what you want. That is what the code does.

Style point: note that this gets rid of the schedule_task() method. Parallel processing can be difficult, and over the decades I've found it extremely valuable to keep inter-thread/inter-process communication as simple and dead obvious as possible. That means, among other things, making use of message queues directly instead of, e.g., hiding them in methods. Layers of abstraction in this context often makes correct code harder to create, extend, debug and maintain.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top