Question

I can't seem to wrap my head around multiprocessing. Am trying to do some basic operation but the multiprocessing script seems to take forever.

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                print ('Tasks Complete')
                self.task_queue.task_done()
                break            
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return

class Task(object):
    def __init__(self, a):
        self.a = a

    def __call__(self):
        #Some more work will go in here but for now just return the value
        return self.a

    def __str__(self):
        return 'ARC'
    def run(self):
        print ('IN')


if __name__ == '__main__':
    start_time = time.time()
    numberList = []

    for x in range(1000000):
        numberList.append(x) 

    result = []
    counter = 0
    total = 0
    for id in numberList:
        total =+ id
        counter += 1
    print(counter)
    print("Finished in Seconds: %s" %(time.time()-start_time))
    ###############################################################################################################################
    #Mutliprocessing starts here....
    ###############################################################################################################################        
    start_time = time.time()
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    num_consumers = multiprocessing.cpu_count() 
    consumers = [Consumer(tasks, results) for i in range(num_consumers)]
    for w in consumers:
        w.start()

    num_jobs = len(numberList)

    for i in range(num_jobs):
        tasks.put(Task(numberList[i]))

    for i in range(num_consumers):
        tasks.put(None)

    print("So far: %s" %(time.time()-start_time))
    result = []
    while num_jobs:
        result.append(results.get())
        num_jobs -= 1
    print (len(result))
    print("Finished in Seconds: %s" %(time.time()-start_time))

Original script got from here

The first basic for loop finishes on average 0.4 seconds and the multiprocessing one finishes in 56 seconds while I would have expected it to be vise versa..?

Is there some logic am missing or it actually is slower? Else how would I structure it to be faster than the standard for loop?

Was it helpful?

Solution

Your multiprocessing code is really over-engineered, and doesn't actually do the work it's supposed to do, anyway. I rewrote it to be simpler, actually do what it's supposed to do, and now it's faster than the simple loop:

import multiprocessing
import time


def add_list(l):
    total = 0 
    counter = 0 
    for ent in l:
        total += ent 
        counter += 1
    return (total, counter)

def split_list(l, n): 
    # Split `l` into `n` equal lists.
    # Borrowed from http://stackoverflow.com/a/2136090/2073595
    return [l[i::n] for i in xrange(n)]

if __name__ == '__main__':
    start_time = time.time()
    numberList = range(1000000):

    counter = 0 
    total = 0 
    for id in numberList:
        total += id
        counter += 1
    print(counter)
    print(total)
    print("Finished in Seconds: %s" %(time.time()-start_time))
    start_time = time.time()

    num_consumers = multiprocessing.cpu_count() 
    # Split the list up so that each consumer can add up a subsection of the list.
    lists = split_list(numberList, num_consumers)
    p = multiprocessing.Pool(num_consumers)
    results = p.map(add_list, lists)
    total = 0 
    counter = 0 
    # Combine the results each worker returned.
    for t, c in results:
        total += t
        counter += c
    print(counter)
    print(total)

    print("Finished in Seconds: %s" %(time.time()-start_time))

And here's the output:

Standard:
1000000
499999500000
Finished in Seconds: 0.272150039673
Multiprocessing:
1000000
499999500000
Finished in Seconds: 0.238755941391

As @aruisdante noted, you have a very light workload, so the benefits of multiprocessing aren't really felt fully here. If you were doing heavier processing, you'd see a bigger difference.

OTHER TIPS

Passing each object from process to process over the queues adds overhead. You have now measured that overhead to be 56 s for one million objects. Passing fewer, larger objects would reduce the overhead but not eliminate it. To benefit from multiprocessing, the computation performed by each task should be relatively heavy compared to the amount of data that needs to be transferred.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top