Question

I am trying to use SequenceFile to pass data between two mapReduce programs. The data that I want to pass has the format >. For some reason it seems that some entries in the maps do not get passed from one program to the other. Here it's my code, first the reducer that generates de SequenceFileOutput and then the mapper that reads from it.

public static class IntSumReducer extends Reducer {

public void reduce(Text key, Iterable<Text> values, 
                   Context context
                   ) throws IOException, InterruptedException {

    MapWritable vector = new MapWritable() ;

    for (Text val : values){
        if(vector.containsKey(val)){
            vector.put(val , new IntWritable(((IntWritable)vector.get(val)).get() + 1));
        }
        else
            vector.put(val , new IntWritable(1));
    }

    context.write(key, vector);

        }
    }

and the mapper:

public static class TokenizerMapper extends Mapper{

  private final static int cota = 100;
  private final static double ady = 0.25;

  public void map(Text key, MapWritable value, Context context
          ) throws IOException, InterruptedException {

      IntWritable tot = (IntWritable)value.get(key);

      int total = tot.get();


      if(total > cota){
          MapWritable vector = new MapWritable() ;
          Set<Writable> keys = value.keySet();

          Iterator<Writable> iterator = keys.iterator();
          while(iterator.hasNext()){
              Text llave = (Text) iterator.next();
              if(!llave.equals(key)){
                  IntWritable cant = (IntWritable) value.get(llave);
                  double rel = (((double)cant.get())/(double)total);
                  if(cant.get() > cota && rel > ady ){
                      vector.put(llave, new DoubleWritable(rel));
                  }
              }
          }
          context.write(key,vector);     
      }
  }

}

Was it helpful?

Solution

for (Text val : values){
    if(vector.containsKey(val)){
        vector.put(val , new IntWritable(((IntWritable)vector.get(val)).get() + 1));
    }
    else
        vector.put(val , new IntWritable(1));
}

Here lies your problem - the val Text object is reused by hadoop, so when calling vector.put you should be creating a new Text object to break away from the val reference (whose value will change in the next iteration of the for loop).

You can amend you logic to he following and it should work (i've also updated the counter increment logic to be more efficient too):

IntWritable tmpInt;
for (Text val : values){
    tmpInt = (IntWritable) vector.get(val);

    if(tmpInt == null) {
        tmpInt = new IntWritable(0);
        // create a copy of val Text object
        vector.put(new Text(val), tmpInt);
    }

    // update the IntWritable wrapped int value
    tmpInt.set(tmpInt.get() + 1);

    // Note: you don't need to re-insert the IntWritable into the map
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top