Your first problem is trying to use timeouts. They're almost never a good idea. They may be a good idea if you can't possibly think of a reliable way to do something efficiently, and you use timeouts only as a first step in checking whether something is really done.
That said, the primary problem is that multiprocessing
is often very bad at reporting exceptions that occur in worker processes. Your code is actually dying here:
node.child1.code.append(node.code + '0')
The error message you're not seeing is "an integer or string of size 1 is required". You can't append a bytearray
to a bytearray
. You want to do :
node.child1.code.extend(node.code + '0')
^^^^^^
instead, and in the similar line for child2
. As is, because the first worker process to take something off the work queue dies, nothing more is ever added to the work queue. That explains everything you've seen - so far ;-)
No timeouts
FYI, the usual approach to avoid timeouts (which are flaky - unreliable) is to put a special sentinel value on a queue. Consumers know it's time to quit when they see the sentinel, and use a plain blocking .get()
to retrieve items from the queue. So first thing is to create a sentinel; e.g., add this near the top:
ALL_DONE = "all done"
Best practice is also to .join()
threads and processes - that way the main program knows (doesn't just guess) when they're done too.
So, you can change the end of _encode_tree()
like so:
for i in range(1, symbol_count + 1):
processed_symbol = results.get()
table.update(processed_symbol)
print "Symbols to process: %d" % (symbol_count - i)
for i in range(mp.cpu_count()):
work.put(ALL_DONE)
worker_pool.join()
messages.put(ALL_DONE)
message_thread.join()
return table
The key here is that the main program knows all the work is done when, and only when, no symbols remain to be processed. Until then, it can unconditionally .get()
results from the results
queue. Then it puts a number of sentinels on the work queue equal to the number of workers. They'll each consume a sentinel and quit. Then we wait for them to finish (worker_pool.join()
). Then a sentinel is put on the message queue, and we wait for that thread to end too. Only then does the function return.
Now nothing ever terminates early, everything is shut down cleanly, and the output of your final table isn't mixed up anymore with various other output from the workers and the message thread. _reporter_thread()
gets rewritten like so:
def _reporter_thread(message_queue):
while True:
message = message_queue.get()
if message == ALL_DONE:
break
else:
print message
and similarly for _encode_proc()
. No more timeouts or try/except Queue.Empty:
fiddling. You don't even have to import Queue
anymore :-)