Question

I'm trying to understand this code for canopy clustering. The purpose of these two classes (one map, one reduce) is to find canopy centers. My problem is that I don't understand the difference between the map and reduce functions. They are nearly the same.

So is there a difference? Or am I just repeating the same same process again in the reducer?

I think the answer is that there is a difference in how the map and reduce functions handle the code. They perform different actions on the data even with similar code.

So can someone please explain the process of the map and reduce when we try to find the canopy centers?

I know for example that a map might looks like this -- (joe, 1) (dave, 1) (joe, 1) (joe, 1)

and then the reduce will go like this: --- (joe, 3) (dave, 1)

Does the same type of thing happen here?

Or maybe I'm performing the same task twice?

Thanks so much.

map function:

package nasdaq.hadoop;

import java.io.*;
import java.util.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;

public class CanopyCentersMapper extends Mapper<LongWritable, Text, Text, Text> {
    //A list with the centers of the canopy
    private ArrayList<ArrayList<String>> canopyCenters;

@Override
public void setup(Context context) {
        this.canopyCenters = new ArrayList<ArrayList<String>>();
}

@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //Seperate the stock name from the values to create a key of the stock and a list of values - what is list of values?
    //What exactly are we splitting here?
    ArrayList<String> stockData = new ArrayList<String>(Arrays.asList(value.toString().split(","))); 

    //remove stock and make first canopy center around it canopy center
    String stockKey = stockData.remove(0);

    //?
    String stockValue = StringUtils.join(",", stockData);

    //Check wether the stock is avaliable for usage as a new canopy center
    boolean isClose = false;    

    for (ArrayList<String> center : canopyCenters) {    //Run over the centers

    //I think...let's say at this point we have a few centers. Then we have our next point to check.
    //We have to compare that point with EVERY center already created. If the distance is larger than EVERY T1
    //then that point becomes a new center! But the more canopies we have there is a good chance it is within
    //the radius of one of the canopies...

            //Measure the distance between the center and the currently checked center
            if (ClusterJob.measureDistance(center, stockData) <= ClusterJob.T1) {
                    //Center is too close
                    isClose = true;
                    break;
            }
    }

    //The center is not smaller than the small radius, add it to the canopy
    if (!isClose) {
        //Center is not too close, add the current data to the center
        canopyCenters.add(stockData);

        //Prepare hadoop data for output
        Text outputKey = new Text();
        Text outputValue = new Text();

        outputKey.set(stockKey);
        outputValue.set(stockValue);

        //Output the stock key and values to reducer
        context.write(outputKey, outputValue);
    }
}

}

Reduce function:

    package nasdaq.hadoop;

import java.io.*;
import java.util.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

public class CanopyCentersReducer extends Reducer<Text, Text, Text, Text> {
    //The canopy centers list
    private ArrayList<ArrayList<String>> canopyCenters;

@Override
public void setup(Context context) {
        //Create a new list for the canopy centers
        this.canopyCenters = new ArrayList<ArrayList<String>>();
}

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    for (Text value : values) {
        //Format the value and key to fit the format
        String stockValue = value.toString();
        ArrayList<String> stockData = new ArrayList<String>(Arrays.asList(stockValue.split(",")));
        String stockKey = key.toString();

        //Check wether the stock is avaliable for usage as a new canopy center
        boolean isClose = false;    
        for (ArrayList<String> center : canopyCenters) {    //Run over the centers
                //Measure the distance between the center and the currently checked center
                if (ClusterJob.measureDistance(center, stockData) <= ClusterJob.T1) {
                        //Center is too close
                        isClose = true;
                        break;
                }
        }

        //The center is not smaller than the small radius, add it to the canopy
        if (!isClose) {
            //Center is not too close, add the current data to the center
            canopyCenters.add(stockData);

            //Prepare hadoop data for output
            Text outputKey = new Text();
            Text outputValue = new Text();

            outputKey.set(stockKey);
            outputValue.set(stockValue);

            //Output the stock key and values to reducer
            context.write(outputKey, outputValue);
        }


    }

**Edit -- more code and explanation

Stockkey is the key value representing stocks. (nasdaq and things like that)

ClusterJob.measureDistance():

    public static double measureDistance(ArrayList<String> origin, ArrayList<String> destination)
{
    double deltaSum = 0.0;
    //Run over all points in the origin vector and calculate the sum of the squared deltas
    for (int i = 0; i < origin.size(); i++) {
        if (destination.size() > i) //Only add to sum if there is a destination to compare to
        {
            deltaSum = deltaSum + Math.pow(Math.abs(Double.valueOf(origin.get(i)) - Double.valueOf(destination.get(i))),2);
        }
    }
    //Return the square root of the sum
    return Math.sqrt(deltaSum);
Was it helpful?

Solution

Ok, the straightforward interpretation of the code is: - The mappers walks over some (presumably random) subset of the data, and generates canopy centers all of which are at least T1 distance from each other. These centers are emitted. - The reducer then walks over all the canopy centers that belong to each specific stock key (like MSFT, GOOG, etc) from all the mappers and then ensures that there are no canopy centers that are within T1 of each other for each stock key value (e.g., no two centers in GOOG are within T1 of each other, although a center in MSFT and a center in GOOG may be close together).

The goal of the code is unclear, personally I think there's got to be a bug. The reducers basically solve the problem as if you are trying to generate centers for each stock key independently (i.e., calculate canopy centers for all data points for GOOG), while the mappers seem to solve the problem of trying to generate centers for all stocks. Placed together like that, you get a contradiction so neither problem is actually getting solved.

If you want centers for all stock keys: - Then the map output must send everything to ONE reducer. Set the map output key to something trivial like a NullWritable. Then the reducer will perform the correct operations without change.

If you want centers for EACH stock key: - Then the mapper needs to be changed so that effectively you have one separate canopy list for each stock key, you can either do this by keeping a separate arrayList for each stock key (preferred, since it will be faster) or, you can just change the distance metric such that stock keys that belong to different stock keys are an infinite distance apart (so they never interact).

P.S. By the way there are also some unrelated issues with your distance metric. First, you're parsing the data using Double.parseDouble, but not catching NumberFormatException. Since you're giving it stockData, which incudes non-digit strings like 'GOOG' in the very first field, you're going to end up crashing the job as soon as you run it. Second, the distance metric ignores any fields with missing values. That is an incorrect implementation of a L2 (pythagorean) distance metric. To see why, consider that this string: "," has distance 0 from any other point, and if it is chosen as a canopy center, no other centers can be chosen. Instead of just setting the delta for a missing dimension to zero, you might consider setting it to something reasonable like the population mean for that attribute, or (to be safe) just discarding that row from the data set for the purposes of clustering.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top