Why isn't a multiprocessing implementation of a hashing function much faster than a serial one in Python?

StackOverflow https://stackoverflow.com/questions/22738922

  •  23-06-2023
  •  | 
  •  

Question

Take the example of this code:

def get_hash(path, hash_type='md5'):
    func = getattr(hashlib, hash_type)()
    f = os.open(path, (os.O_RDWR | os.O_BINARY))
    for block in iter(lambda: os.read(f, 1024*func.block_size), b''):
        func.update(block)
    os.close(f)
    return func.hexdigest()

This function returns the md5sum of any file. Let's say I have a directory with more than 30 files and I want to run the hashing function on each file:

def hasher(path=some_path):
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            path = os.path.join(root, name)
            yield get_hash(path)
@some_timer_decorator
... some testing function here ...

test1 took 4.684999942779541 seconds.

Now, as you can see, the situation at hand gives me the opportunity to 'exploit' the hasher function and add multiprocessing:

def hasher_parallel(path=PATH):
    p = multiprocessing.Pool(3)
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            full_name = os.path.join(root, name)
            yield p.apply_async(get_hash, (full_name,)).get()
@some_timer_decorator
... some other testing function here ...

test2 took 4.781000137329102 seconds.

The outputs are identical. I was expecting the paralleled version to be much faster, since most of the files are smaller than <20MB and the hasher function calculates those sums very very fast (in general, for files of that size). Is there something wrong with my implementation? If there's nothing wrong with it, is there a faster approach to the same problem?

#

This is the decorator function that I used to measure the execution time:

def hasher_time(f):
        def f_timer(*args, **kwargs):
            start = time.time()
            result = f(*args, **kwargs)
            end = time.time()
            print(f.__name__, 'took', end - start, 'seconds')
            return result
        return f_timer
#
Was it helpful?

Solution

You are pushing jobs out then wait for them to complete:

yield p.apply_async(get_hash, (full_name,)).get()

The AsyncResult.get() method blocks until the job is done, you are effectively running your jobs sequentially.

Collect the jobs, poll them with AsyncResult.ready() until they are done, then .get() the results.

Better still, push all jobs into the pool with .async_apply() calls, then close the pool, call .join() (which blocks until all jobs are done), then retrieve results with .get():

def hasher_parallel(path=PATH):
    p = multiprocessing.Pool(3)
    jobs = []
    for root, dirs, files in os.walk(path, topdown=False):
        for name in files:
            full_name = os.path.join(root, name)
            jobs.append(p.apply_async(get_hash, (full_name,)))
    p.close()
    p.join()  # wait for jobs to complete
    for job in jobs:
        yield job.get()

You could use the Pool.imap() method to simplify the code somewhat; it'll yield results as they come available:

def hasher_parallel(path=PATH):
    p = multiprocessing.Pool(3)
    filenames = (os.path.join(root, name)
        for root, dirs, files in os.walk(path, topdown=False)
        for name in files)
    for result in p.imap(get_hash, filenames):
        yield result

but do experiment with the chunksize parameter and the unordered variant as well.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top