Question

I'm very new to Map/Reduce principles and python mrjob framework, I wrote this sample code, and it works fine, but I would like to know what can I change in it to make it "perfect" / more efficient.

from mrjob.job import MRJob
import operator
import re

# append result from each reducer 
output_words = []

class MRSudo(MRJob):

    def init_mapper(self):
        # move list of tuples across mapper
        self.words = []

    def mapper(self, _, line):
        command = line.split()[-1]
        self.words.append((command, 1))

    def final_mapper(self):
    for word_pair in self.words:
            yield word_pair

    def reducer(self, command, count): 
        # append tuples to the list
        output_words.append((command, sum(count)))

    def final_reducer(self):
        # Sort tuples in the list by occurence
        map(operator.itemgetter(1), output_words)
        sorted_words = sorted(output_words, key=operator.itemgetter(1), reverse=True)
        for result in sorted_words:
            yield result

    def steps(self):
        return [self.mr(mapper_init=self.init_mapper,
                        mapper=self.mapper,
                        mapper_final=self.final_mapper,
                        reducer=self.reducer,
                        reducer_final=self.final_reducer)]

if __name__ == '__main__':
    MRSudo.run()
Was it helpful?

Solution

There are two ways to follow.

1. Improve your process

Your are doing a distributed word count. This operation is algebraic but your are not taking advantage of this property.

For every words of your input you are sending a record to the reducers. Theses bytes have to be partitioned, sent over the network and then sorted by the reducer. It is nor efficient nor scalable, the amount of data to be send by the mappers to the reducers is usually a bottleneck.

You should add a combiner to your job. It will do exactly the same thing than your current reducer. The combiner is run just after the mapper in the same address space. It means than the amount of data you are sending over the network is no longer linear with number of words of your input, but is bounded by the number of unique words. Which is usually several order of magnitude lower.

Since distributed word count example is overused, you will easily find more information by searching "distributed word count combiner". All algebraic operations must have a combiner.

2. Use more efficient tools

Mrjob is a great tool to quickly write map reduce jobs. Usually it is faster to write a python Job than a Java one. However it has a runtime cost:

  1. Python is usually slower than Java
  2. MRJob is slower than most of the python framework because is does not, yet, use typedbytes

You have to decide if it worths rewriting some of your jobs in Java using the regular API. If you are writing long lived batch jobs, it could make sense to invest some development time to decrease the runtime costs.

In the long term writing a Java Job is usually not much longer than writing it in python. But you have to make some up front investments: create a project with a build system, package it, deploy it etc. With MRJob you just have to execute your python text file.

Cloudera did a benchmark of the Hadoop python frameworks few months ago. MRJob was way slower than their Java jobs (5 to 7 times). MRJob performances should improve when typedbytes will be available but Java jobs will still be 2 to 3 times faster.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top