Question

If I wish to store the current value of an iterator to compare to the next value of the iterator in a Reduce method, Hadoop requires that I clone it instead of simply assigning its reference to a temporary variable.

I am about to post the code to my reducer.

You will see two parts:

  1. main method for testing in Eclipse
  2. reduce method for executing in Hadoop

You will notice that both lines of code are identical, except for the following things:

  1. The main method gets an Iterator from an ArrayList that I hard coded into it, whereas the reduce method gets an Iterator from the mapper method.
  2. The main method does not execute context.write, of course.

Here is the code that both pretty much share:

MMI currentMMI = null;
MMI previousMMI = null;
UltraAggregation currentAggregation = null;

while (values.hasNext()) {
    currentMMI = values.next();
    if (currentAggregation == null) {
        currentAggregation = new UltraAggregation(currentMMI);
    }
    if (previousMMI == null) {
        //previousMMI = new MMI(currentMMI);
        previousMMI = currentMMI;
        continue;
    }
    System.out.println();
    System.out.println("currentMMI = " + currentMMI);
    System.out.println("previousMMI = " + previousMMI);
    System.out.println("equals? " + currentMMI.equals(previousMMI));
    System.out.println("==? " + (currentMMI == previousMMI));
    System.out.println();

    // Business logic goes here and involves a context.write on certain conditions

    previousMMI = currentMMI;
}
//final context.write

You will notice that at the end of each loop I set the reference of the just-used MMI ("currentMMI") to the object variable "previousMMI". Then, on the next loop, I set the reference of the next() to currentMMI. When I execute my main method in Eclipse, the following queries evaluate to false, as expected:

currentMMI == previousMMI;
currentMMI.equals(previousMMI);  

However, when executed in Hadoop, the currentMMI and the previousMMI always evaluate to true for the following two queries:

currentMMI == previousMMI;
currentMMI.equals(previousMMI);

Only if I change the line previousMMI = currentMMI to previousMMI = new MMI(currentMMI) will they evaluate to false. (I made a constructor for the MMI class which essentially shallow clones the incoming parameter).

Why would I have to clone instead of setting the reference when using the reducer in hadoop but not in the main method?

I am now going to copy and paste the reducer class, which has the 2 parts: the main method for eclipse testing and the reduce method for actual use in Hadoop.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.cisco.webex.hadoop.ultrautility.models.MMI;
import com.cisco.webex.hadoop.ultrautility.models.UltraAggregation;

public class MMIReducer extends Reducer<Text, MMI, Object, UltraAggregation> {
    public static void main(String[] args) {
        ArrayList<MMI> mmis = new ArrayList<MMI>();
        mmis.add(new MMI("961864,1,1,1,D1,10,0,2013-08-02 06:00:00.0,USA,N,N"));
        mmis.add(new MMI("961865,1,1,1,D1,10,1,2013-08-02 07:00:00.0,USA,N,N"));
        mmis.add(new MMI("961866,1,1,1,D1,10,2,2013-08-02 08:00:00.0,USA,N,N"));
        mmis.add(new MMI("961867,1,1,1,D1,10,3,2013-08-02 09:00:00.0,USA,N,N"));
        mmis.add(new MMI("961868,1,1,1,D1,10,4,2013-08-02 10:00:00.0,USA,N,N"));
        mmis.add(new MMI("961869,1,1,1,D1,10,5,2013-08-02 11:00:00.0,USA,N,N"));
        mmis.add(new MMI("961870,1,1,1,D1,10,6,2013-08-02 12:00:00.0,USA,N,N"));
        mmis.add(new MMI("961871,1,1,1,D1,10,7,2013-08-02 13:00:00.0,USA,N,N"));
        mmis.add(new MMI("961872,1,1,1,D1,10,8,2013-08-02 14:00:00.0,USA,N,N"));
        mmis.add(new MMI("961873,1,1,1,D1,10,9,2013-08-02 15:00:00.0,USA,N,N"));

        Iterator<MMI> values = mmis.iterator();

        MMI currentMMI = null;
        MMI previousMMI = null;
        UltraAggregation currentAggregation = null;

        while (values.hasNext()) {
            currentMMI = values.next();
            if (currentAggregation == null) {
                currentAggregation = new UltraAggregation(currentMMI);
            }
            if (previousMMI == null) {
                //previousMMI = new MMI(currentMMI);
                previousMMI = currentMMI;
                continue;
            }
            System.out.println();
            System.out.println("currentMMI = " + currentMMI);
            System.out.println("previousMMI = " + previousMMI);
            System.out.println("equals? " + currentMMI.equals(previousMMI));
            System.out.println("==? " + (currentMMI == previousMMI));
            System.out.println();

            // Business logic goes here and involves a context.write on certain conditions

            //previousMMI = new MMI(currentMMI);
            /*
            * THIS DOESNT CAUSE LOGIC ERRORS IN MAIN METHOD
            */
            previousMMI = currentMMI;
        }
        //context.write(null, currentAggregation);
    }

    @Override
    public void reduce(Text key, Iterable<MMI> vals, Context context) throws IOException, InterruptedException {
        Iterator<MMI> values = vals.iterator();

        //key = deviceId
        MMI currentMMI = null;
        MMI previousMMI = null;
        UltraAggregation currentAggregation = null;

        while (values.hasNext()) {
            currentMMI = values.next();
            if (currentAggregation == null) {
                currentAggregation = new UltraAggregation(currentMMI);
            }
            if (previousMMI == null) {
                System.out.println("PreviousMMI is null, setting previousMMI to current MMI and continuing");
                //previousMMI = new MMI(currentMMI);
                previousMMI = currentMMI;
                continue;
            }
            System.out.println();
            System.out.println("currentMMI = " + currentMMI);
            System.out.println("previousMMI = " + previousMMI);
            System.out.println("equals? " + currentMMI.equals(previousMMI));
            System.out.println("==? " + (currentMMI == previousMMI));
            System.out.println();

            // Business logic goes here and involves a context.write on certain conditions

            //previousMMI = new MMI(currentMMI); //Acts as intended
            /*
            * THIS CAUSES ERRORS WHEN EXECUTED THROUGH HADOOP
            */
            previousMMI = currentMMI; // Causes errors
        }
        context.write(null, currentAggregation);
    }
}

Here is a truncated result from stdout when I execute the main method in eclipse with the static values:

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 06:00:00 PDT 2013;Uptime|0.0
equals? false
==? false


currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 08:00:00 PDT 2013;Uptime|2.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Fri Aug 02 07:00:00 PDT 2013;Uptime|1.0
equals? false
==? false

Here is a truncated result when I execute the hadoop jar:

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 06:00:00 PDT 2013;Uptime|0.0
equals? true
==? true

currentMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0
previousMMI = Device Id|D1;Entitlement Tag|10;Device Time|Sun Aug 04 07:00:00 PDT 2013;Uptime|1.0
equals? true
==? true

Why must I clone it for Hadoop but not in Eclipse?

Was it helpful?

Solution

It is very inefficient to store all values in memory, so the objects are reused and loaded one at a time. See this other SO question for a good explanation. Summary:

[...] when looping through the Iterable value list, each Object instance is re-used, so it only keeps one instance around at a given time.

OTHER TIPS

The reducer iterable reuses the same object for each new value, and just setting its payload, rather than creating a new object. The idea is that if the iterator walks over many millions of values, then the object creation and garbage collection overhead becomes a non-trivial part of total CPU load.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top