Question

I'm thinking about building a small testing application in hadoop to get the hang of the system.

The application I have in mind will be in the realm of doing statistics. I want to have "The 10 worst values for each key" from my reducer function (where I must assume the possibility a huge number of values for some keys).

What I have planned is that the values that go into my reducer will basically be the combination of "The actual value" and "The quality/relevance of the actual value". Based on the relevance I "simply" want to take the 10 worst/best values and output them from the reducer.

How do I go about doing that (assuming a huge number of values for a specific key)? Is there a way that I can sort all values BEFORE they are sent into the reducer (and simply stop reading the input when I have read the first 10) or must this be done differently?

Can someone here point me to a piece of example code I can have a look at?


Update: I found two interesting Jira issues HADOOP-485 and HADOOP-686.

Anyone has a code fragment on how to use this in the Hadoop 0.20 API?

Was it helpful?

Solution

It sounds like you want to use a Combiner, which defines what to do with the values your create on the Map side before they are sent to the Reducer, but after they are grouped by key. The combiner is often set to just be the reducer class (so you reduce on the map side, and then again on the reduce side).

Take a look at how the wordCount example uses the combiner to pre-compute partial counts:

http://wiki.apache.org/hadoop/WordCount


Update Here's what I have in mind for your problem; it's possible I misunderstood what you are trying to do, though.

Every mapper emits <key, {score, data}> pairs.

The combiner gets a partial set of these pairs: <key, [set of {score, data}> and does a local sort (still on the mapper nodes), and outputs <key, [sorted set of top 10 local {score, data}]> pairs.

The reducer will get <key, [set of top-10-sets]> -- all it has to do is perform the merge step of sort-merge (no sorting needed) for each of the members of the value sets, and stop merging when the first 10 values are pulled.


update 2

So, now that we know that the rank as cumilative and as a result, you can't filter the data early by using combiners, the only thing is to do what you suggested -- get a secondary sort going. You've found the right tickets; there is an example of how to do this in Hadoop 20 in src/examples/org/apache/hadoop/examples/SecondarySort.java (or, if you don't want to download the whole source tree, you can look at the example patch in https://issues.apache.org/jira/browse/HADOOP-4545 )

OTHER TIPS

Sounds definitively like a SecondarySortProblem. Take a look into "Hadoop: The definitive guide", if you like to. It's from O'Reilly. You can also access it online. There they describe a pretty good implementation.

I implemented it by myself too. Basically it works this way: The partitioner will care for all the key-value-pairs with the same key going to one single reducer. Nothing special here. But there is also the GroupingComparator, that will form groupings. One group is actually passed as an iterator to one reduce()-call. So a Partition can contain multiple groupings. But the amount of partitions should be equal the number of reducers. But the grouping also allows to do some sorting as it implements a compareTo-method.

With this method, you can control, that the 10 best/worst/highest/lowest however keys will reach the reducer first. So after you read these 10 keys, you can leave the reduce method without any further iterations.

Hope that was helpful :-)

If I understand the question properly, you'll need to use a TotalOrderPartitioner.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top