Question

I have inputs from two sources:

  1. map output in the form,

    output.collect(new StockKey(Text(x+" "+id), new Text(id2)), new Text(data));
    
  2. map output in the form,

    output.collect(new StockKey(new Text(x+" "+id), new Text(1), new Text(data));
    

Job conf:

 conf.setPartitionerClass(CustomPartitioner.class);
 conf.setValueGroupingComparatorClass(StockKeyGroupingComparator.class);

where StockKey is a custom class of format (new Text(), new Text());

Constructor:

public StockKey(){
    this.symbol = new Text();
    this.timestamp = new Text();
}

Grouping comparator:

public class StockKeyGroupingComparator extends WritableComparator {

    protected StockKeyGroupingComparator() {
        super(StockKey.class, true);
    }  

    public int compare(WritableComparable w1, WritableComparable w2){
        StockKey k1 = (StockKey)w1;
        StockKey k2 = (StockKey)w2;

        Text x1 = new Text(k1.getSymbol());
        Text x2 = new Text(k2.getSymbol());

        return x1.compareTo(x2);

    }

}

But I'm not receiving the map output values from input

I'm getting only the map output value reaches the reducer. I want the the records which have the symbol viz new Text(x+" "+id) which are common from both the map outputs to be grouped to the same reducer. I am struck here.

Please help!

No correct solution

OTHER TIPS

To do this you need a Partitioner which fits in as follows:

  1. Your mappers output a bunch of records as key/value pairs
  2. For each record, the partitioner is passed the key, the value and the number of reducers. The partitioner decides which reducer will handle the record
  3. The records are shipped off to their respective partitions (reducers)
  4. The GroupingComparator is run to decide which key value pairs get grouped into an iterable for a single call to the reducer() method
  5. and so on...

I think the default partitioner is choosing the reducer partition for each record based on the entire value of your key (that's the default behavior). But you want records grouped by only part of the key (just the symbol and not the symbol and timestamp). So you need to write a partitioner that does this and specify/configure it in the driver class.

Once you do that, you're grouping comparator should help group the records as you've intended.

EDIT: random thoughts

  • You might make things easier on yourself if you moved the timestamp to the value, making the key simple (just the symbol) and the value complex (timestamp and value). Then you wouldn't need a partitioner or a grouping comparator.
  • You didn't say either way, but you did use the MultipleInputs class, right? That's the only way to invoke two or more mappers for the same job.
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top