Question

If the operation performed with MapReduce is not commutative and associative, then the combiner cannot be the same as the reducer.

For example when calculating an average value the combiners sums the values for a key and the reducer sums then and then divides the sum by the total number of values for that key. The code of the combiner has only a slight modification. What if you could use the same class for both combiner and reducer and have a peace of code that can determine if the current task is a combiner or a reducer? If it finds out that it is a reducer than it divides the sum by the count.

Something like this:

protected void reduce(Text keyIn, Iterable<PairWritable> valuesIn,
      Context context)
  throws IOException, InterruptedException {
    double sum = 0.0d;
    long count = 0l;

    for (PairWritable valueIn : valuesIn) {
      sum += valueIn.getSum();
      count += valueIn.getCount();
    }

    if (THIS_IS_A_REDUCER) {
      sum /= count;
    }

    context.write(keyIn, new PairWritable(sum, count));
  }

Is it possible to do this? Can the peace of code THIS_IS_A_REDUCER from above be replaced with something?

I can determine if a task is a mapper or a reducer from task attempt ID String, but both combiners and reducers seem to have similar string patterns.

Was it helpful?

Solution

I suppose you could interrogate the Context object and get the task ID. Then, once you have the ID, the mapper (including the combiner) will have a "m" in the name, while a reducer will have a "r" in the name.

To get the task attempt ID, use .getTaskAttemptID(). I think you should be able to do context.getTaskAttemptID() to use this, but I can't test it to be sure.

OTHER TIPS

This is a flawed question. Whenever you found there is a need to differentiate which reduce() a task calls. Add a combiner. For example, You write

public static class Combine extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> message, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {}

public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> message, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {}

In the main(), you write

conf.setReducerClass(Reduce.class);
conf.setCombinerClass(Combine.class);

While I know this question is already solved, I have another solution. What I did was make my Combiner a subclass of the Reducer. Then in the Reducer code I was able to test if I am the Combiner subclass or not.

The main benefit for this is I needed to modify my key during the Reducer step but didn't want to change it during the Combining step (else I would have applied the same transformation twice). Other than that, 95% of the code was identical.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top