Question

When running a MapReduce job with a specified combiner, is the combiner run during the sort phase? I understand that the combiner is run on mapper output for each spill, but it seems like it would also be beneficial to run during intermediate steps when merge sorting. I'm assuming here that in some stages of the sort, mapper output for some equivalent keys is held in memory at some point.

If this doesn't currently happen, is there a particular reason, or just something which hasn't been implemented?

Thanks in advance!

Was it helpful?

Solution

Combiners are there to save network bandwidth.

The mapoutput directly gets sorted:

sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

This happens right after the real mapping is done. During iteration through the buffer it checks if there has a combiner been set and if yes it combines the records. If not, it directly spills onto disk.

The important parts are in the MapTask, if you'd like to see it for yourself.

    sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
    // some fields
    for (int i = 0; i < partitions; ++i) {
        // check if configured
        if (combinerRunner == null) {
          // spill directly
        } else {
            combinerRunner.combine(kvIter, combineCollector);
        }
    }

This is the right stage to save the disk space and the network bandwidth, because it is very likely that the output has to be transfered. During the merge/shuffle/sort phase it is not beneficial because then you have to crunch more amounts of data in comparision with the combiner run at map finish time.

Note the sort-phase which is shown in the web interface is misleading. It is just pure merging.

OTHER TIPS

There are two opportunities for running the Combiner, both on the map side of processing. (A very good online reference is from Tom White's "Hadoop: The Definitive Guide" - https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-6/shuffle-and-sort )

The first opportunity comes on the map side after completing the in-memory sort by key of each partition, and before writing those sorted data to disk. The motivation for running the Combiner at this point is to reduce the amount of data ultimately written to local storage. By running the Combiner here, we also reduce the amount of data that will need to be merged and sorted in the next step. So to the original question posted, yes, the Combiner is already being applied at this early step.

The second opportunity comes right after merging and sorting the spill files. In this case, the motivation for running the Combiner is to reduce the amount of data ultimately sent over the network to the reducers. This stage benefits from the earlier application of the Combiner, which may have already reduced the amount of data to be processed by this step.

The combiner is only going to run how you understand it.

I suspect the reason that the combiner only works in this way is that it reduces the amount of data being sent to the reducers. This is a huge gain in many situations. Meanwhile, in the reducer, the data is already there, and whether you combine them in the sort/merge or in your reduce logic is not really going to matter computationally (it's either done now or later).

So, I guess my point is: you may get gains by combining like you say in the merge, but it's not going to be as much as the map-side combiner.

I haven't gone through the code but in reference to Hadoop : The definitive guide by Tom White 3rd edition, it does mention that if the combiner is specified it will run during the merge phase in the reducer. Following is excerpt from the text:

" The map outputs are copied to the reduce task JVM’s memory if they are small enough (the buffer’s size is controlled by mapred.job.shuffle.input.buffer.percent, which specifies the proportion of the heap to use for this purpose); otherwise, they are copied to disk. When the in-memory buffer reaches a threshold size (controlled by mapred.job.shuffle.merge.percent), or reaches a threshold number of map outputs (mapred.inmem.merge.threshold), it is merged and spilled to disk. If a combiner is specified it will be run during the merge to reduce the amount of data written to disk. "

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top