문제

I am writing some code to build a table of variable length (Huffman) codes, and I wanted to use the multiprocessing module for fun. The idea is to have each process try to get a node from the queue. They do work on the node, and either put that nodes two children back into the work queue, or they put the variable length code into result queue. They are also passing messages to a message queue, which should be printed by a thread in the main process. Here is the code so far:

import Queue
import multiprocessing as mp
from threading import Thread
from collections import Counter, namedtuple

Node = namedtuple("Node", ["child1", "child2", "weight", "symbol", "code"])

def _sort_func(node):
    return node.weight

def _encode_proc(proc_number, work_queue, result_queue, message_queue):
    while True:
        try:
            #get a node from the work queue
            node = work_queue.get(timeout=0.1)
            #if it is an end node, add the symbol-code pair to the result queue
            if node.child1 == node.child2 == None:
                message_queue.put("Symbol processed! : proc%d" % proc_number)
                result_queue.put({node.symbol:node.code})
            #otherwise do some work and add some nodes to the work queue
            else:
                message_queue.put("More work to be done! : proc%d" % proc_number)
                node.child1.code.append(node.code + '0')
                node.child2.code.append(node.code + '1')
                work_queue.put(node.child1)
                work_queue.put(node.child2)
        except Queue.Empty: #everything is probably done
            return

def _reporter_thread(message_queue):
    while True:
        try:
            message = message_queue.get(timeout=0.1)
            print message
        except Queue.Empty: #everything is probably done
            return

def _encode_tree(tree, symbol_count):
    """Uses multiple processes to walk the tree and build the huffman codes."""
    #Create a manager to manage the queues, and a pool of workers.
    manager = mp.Manager()
    worker_pool = mp.Pool()
    #create the queues you will be using
    work = manager.Queue()
    results = manager.Queue()
    messages = manager.Queue()
    #add work to the work queue, and start the message printing thread
    work.put(tree)
    message_thread = Thread(target=_reporter_thread, args=(messages,))
    message_thread.start()
    #add the workers to the pool and close it
    for i in range(mp.cpu_count()):
        worker_pool.apply_async(_encode_proc, (i, work, results, messages))
    worker_pool.close()
    #get the results from the results queue, and update the table of codes
    table = {}
    while symbol_count > 0:
        try:
            processed_symbol = results.get(timeout=0.1)
            table.update(processed_symbol)
            symbol_count -= 1
        except Queue.Empty:
            print "WAI DERe NO SYMBOLzzzZzz!!!"
        finally:
            print "Symbols to process: %d" % symbol_count
    return table

def make_huffman_table(data):
    """
    data is an iterable containing the string that needs to be encoded.
    Returns a dictionary mapping symbols to codes.
    """
    #Build a list of Nodes out of the characters in data
    nodes = [Node(None, None, weight, symbol, bytearray()) for symbol, weight in Counter(data).items()]
    nodes.sort(reverse=True, key=_sort_func)
    symbols = len(nodes)
    append_node = nodes.append
    while len(nodes) > 1:
        #make a new node out of the two nodes with the lowest weight and add it to the list of nodes.
        child2, child1 = nodes.pop(), nodes.pop()
        new_node = Node(child1, child2, child1.weight+child2.weight, None, bytearray())
        append_node(new_node)
        #then resort the nodes
        nodes.sort(reverse=True, key=_sort_func)
    top_node = nodes[0]
    return _encode_tree(top_node, symbols)

def chars(fname):
    """
    A simple generator to make reading from files without loading them 
    totally into memory a simple task.
    """
    f = open(fname)
    char = f.read(1)
    while char != '':
        yield char
        char = f.read(1)
    f.close()
    raise StopIteration

if __name__ == "__main__":
    text = chars("romeo-and-juliet.txt")
    table = make_huffman_table(text)
    print table

The current output of this is:

More work to be done! : proc0
WAI DERe NO SYMBOLzzzZzz!!!
Symbols to process: 92
WAI DERe NO SYMBOLzzzZzz!!!
Symbols to process: 92
WAI DERe NO SYMBOLzzzZzz!!!
Symbols to process: 92

It just repeats the last bit forever. After the first process adds work to the node, everything just stops. Why is that? Am I not understand/using queues properly? Sorry for all the code to read.

도움이 되었습니까?

해결책

Your first problem is trying to use timeouts. They're almost never a good idea. They may be a good idea if you can't possibly think of a reliable way to do something efficiently, and you use timeouts only as a first step in checking whether something is really done.

That said, the primary problem is that multiprocessing is often very bad at reporting exceptions that occur in worker processes. Your code is actually dying here:

node.child1.code.append(node.code + '0')

The error message you're not seeing is "an integer or string of size 1 is required". You can't append a bytearray to a bytearray. You want to do :

node.child1.code.extend(node.code + '0')
                 ^^^^^^

instead, and in the similar line for child2. As is, because the first worker process to take something off the work queue dies, nothing more is ever added to the work queue. That explains everything you've seen - so far ;-)

No timeouts

FYI, the usual approach to avoid timeouts (which are flaky - unreliable) is to put a special sentinel value on a queue. Consumers know it's time to quit when they see the sentinel, and use a plain blocking .get() to retrieve items from the queue. So first thing is to create a sentinel; e.g., add this near the top:

ALL_DONE = "all done"

Best practice is also to .join() threads and processes - that way the main program knows (doesn't just guess) when they're done too.

So, you can change the end of _encode_tree() like so:

for i in range(1, symbol_count + 1):
    processed_symbol = results.get()
    table.update(processed_symbol)
    print "Symbols to process: %d" % (symbol_count - i)
for i in range(mp.cpu_count()):
    work.put(ALL_DONE)
worker_pool.join()
messages.put(ALL_DONE)
message_thread.join()
return table

The key here is that the main program knows all the work is done when, and only when, no symbols remain to be processed. Until then, it can unconditionally .get() results from the results queue. Then it puts a number of sentinels on the work queue equal to the number of workers. They'll each consume a sentinel and quit. Then we wait for them to finish (worker_pool.join()). Then a sentinel is put on the message queue, and we wait for that thread to end too. Only then does the function return.

Now nothing ever terminates early, everything is shut down cleanly, and the output of your final table isn't mixed up anymore with various other output from the workers and the message thread. _reporter_thread() gets rewritten like so:

def _reporter_thread(message_queue):
    while True:
        message = message_queue.get()
        if message == ALL_DONE:
            break
        else:
            print message

and similarly for _encode_proc(). No more timeouts or try/except Queue.Empty: fiddling. You don't even have to import Queue anymore :-)

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top