Question

I am trying to yield the probability each key,value pair generated from mapper has.

So, lets say mapper yields:

a, (r, 5)
a, (e, 6)
a, (w, 7)

I need to add 5+6+7 = 18 and then find probabilities 5/18, 6/18, 7/18

so the final output from the reducer would look like:

a, [[r, 5, 0.278], [e, 6, 0.33], [w, 7, 0.389]]

so far, I can only get the reducer to sum all integers from the value. How can I make it to go back and divide each instance by the total sum?

thanks!

Was it helpful?

Solution

Pai's solution is technically correct, but in practice this will give you a lot of strife, as setting the partitioning can be a big pain (see https://groups.google.com/forum/#!topic/mrjob/aV7bNn0sJ2k).

You can achieve this task more easily by using mrjob.step, and then creating two reducers, such as in this example: https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_next_word_stats.py

To do it in the vein you're describing:

from mrjob.job import MRJob
import re
from mrjob.step import MRStep
from collections import defaultdict

wordRe = re.compile(r"[\w]+")

class MRComplaintFrequencyCount(MRJob):

    def mapper(self, _, line):
        self.increment_counter('group','num_mapper_calls',1)

        #Issue is third column in csv
        issue = line.split(",")[3]

        for word in wordRe.findall(issue):
            #Send all map outputs to same reducer
            yield word.lower(), 1

    def reducer(self, key, values):
        self.increment_counter('group','num_reducer_calls',1)  
        wordCounts = defaultdict(int)
        total = 0         
        for value in values:
            word, count = value
            total+=count
            wordCounts[word]+=count

        for k,v in wordCounts.iteritems():
            # word, frequency, relative frequency 
            yield k, (v, float(v)/total)

    def combiner(self, key, values):
        self.increment_counter('group','num_combiner_calls',1) 
        yield None, (key, sum(values))


if __name__ == '__main__':
    MRComplaintFrequencyCount.run()

This does a standard word count and aggregates mostly in the combiner, then uses "None" as the common key, so every word indirectly gets sent to the reducer under the same key. In the reducer you can get the total word count and compute relative frequencies.

OTHER TIPS

What you are doing above should work as well, but this is assuming that all of the data for a single key will fit in memory. If it does, then at Reducer you can hold all values in memory and then compute your total to then calculate the marginal for each key-value pair. This is commonly known as the "stripes" approach.

However, most of the times this might now be true and the data might not fit in memory. In this case you will have to find a way to send values to compute your total before the actual key-value pair so that when they can then be used to compute the marginal and emit the value right away.

This is a candidate for the "order of inversion" design pattern. Its useful when you need to calculate relative frequencies. The basic idea is at the Mapper's end you emit 2 key-value pairs for each intermediate data where one of the key-value pair will have the same common key for all values. This will be used to calculate the total.

Example:

For a, (r, 5) :
---------------
emit (a, r), 5
emit (a, *), 5


For a, (e, 6) :
---------------
emit (a, e), 6
emit (a, *), 6


For a, (w, 7) :
---------------
emit (a, w), 7
emit (a, *), 7

Once this is done, you need a partitioner that will partition each of the intermediate key-value pair using only the first value in the key. In the example above using "a".

You will also need a key sort order that always places the key having * in the second part of the key above all.

This way all intermediate keys have "a" in the first part of the key will end up in the same reducer. Also, they will sorted in a fashion as shown below -

emit (a, *), 5
emit (a, *), 6
emit (a, *), 7
emit (a, e), 6
emit (a, r), 5
emit (a, w), 7

At the reducer as you iterate through the key-value pairs, you will have to simply accumulate the values from the keys if they have a * in the second part of the key. You can then use the accumulated value to calculate your marginal for all the other key-value pairs.

total = 0
for(value : values){
    if (key.second == *)
        total += value
    else
        emit (key.first , key.second, value, value/total)
}

This design pattern is commonly known as Order of inversion that uses the pairs approach. For more info on this and other design patterns I would suggest reading the chapter on MapReduce design patterns in this book - http://lintool.github.com/MapReduceAlgorithms/. It very well explained with examples.

You can simply count the sum, as you do, and also keep the pairs in memory, to emit the probabilities that you want, as follows:

reduce (key, list<values>):
    int sum = 0;
    for (value in values) {
        sum = sum + value.frequency; //assuming you can extract two fields in each value: value.word and value.frequency
    }
    String outputValue = "[";
    for (value in values) { //iterate over the values once more
        outputValue = outputValue + "["+ value.word + ", " +value.frequency + ", "+ value.frequency/sum +"],"
    }
    outputValue = outputValue.replaceLast(",","]");
    emit (key, outputValue);

Of course, that's only a pseudocode, as I am not used to python, but I hope the transition should be quite easy.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top