Domanda

so when we use Java for writing map/reduce program, the map collects the data and reducer receives the list of values per key, like

Map(k, v) -> k1, v1  
    then shuffle and sort happens  
    then reducer gets it  

reduce(k1, List<values>)  

to work on. but is it possible to do the same with python using streaming? I used this as reference and seems like reducer gets data per line as supplied on command-line

È stato utile?

Soluzione

In Hadoop Streaming, the mapper writes key-value pairs to sys.stdout. Hadoop does the shuffle and sort and directs the results to the mapper in sys.stdin. How you actually handle the map and the reduce is entirely up to you, so long as you follow that model (map to stdout, reduce from stdin). This is why it can be tested independently of Hadoop via cat data | map | sort | reduce on the command line.

The input to the reducer is the same key-value pairs that were mapped, but comes in sorted. You can iterate through the results and accumulate totals as the example demonstrates, or you can take it further and pass the input to itertools.groupby() and that will give you the equivalent to the k1, List<values> input that you are used to, and which work well the the reduce() builtin.

The point being that it's up to you to implement the reduce.

Altri suggerimenti

May be this can help you. I found this from apache... org

Customizing the Way to Split Lines into Key/Value Pairs As noted earlier, when the Map/Reduce framework reads a line from the stdout of the mapper, it splits the line into a key/value pair. By default, the prefix of the line up to the first tab character is the key and the rest of the line (excluding the tab character) is the value.

However, you can customize this default. You can specify a field separator other than the tab character (the default), and you can specify the nth (n >= 1) character rather than the first character in a line (the default) as the separator between the key and value. For example:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -D stream.map.output.field.separator=. \
    -D stream.num.map.output.key.fields=4 

In the above example, -D stream.map.output.field.separator=. specifies "." as the field separator for the map outputs, and the prefix up to the fourth "." in a line will be the key and the rest of the line (excluding the fourth ".") will be the value. If a line has less than four "."s, then the whole line will be the key and the value will be an empty Text object (like the one created by new Text("")).

Similarly, you can use -D stream.reduce.output.field.separator=SEP and -D stream.num.reduce.output.fields=NUM to specify the nth field separator in a line of the reduce outputs as the separator between the key and the value.

Similarly, you can specify stream.map.input.field.separator and stream.reduce.input.field.separator as the input separator for map/reduce inputs. By default the separator is the tab character.

PipeReducer is the reducer implementation for Hadoop streaming. The reducer gets key/values, iterates it and sends to the STDIN as key/value and not key/values. This is the default behavior of Hadoop streaming. I don't see any option to change this, unless the Hadoop code has been modified.

public void reduce(Object key, Iterator values, OutputCollector output,
                 Reporter reporter) throws IOException {

    .....
    while (values.hasNext()) {
    .....
        inWriter_.writeKey(key);
        inWriter_.writeValue(val);
    .....      
    }
}

As per Hadoop's streaming references here:

  • The key is, by default, the prefix of the line before the first tab.

when the Map/Reduce framework reads a line from the stdout of the mapper, it splits the line into a key/value pair. By default, the prefix of the line up to the first tab character is the key and the rest of the line (excluding the tab character) is the value.

  • Delimiters and keys positions can be customized.

However, you can customize this default. You can specify a field separator other than the tab character (the default), and you can specify the nth (n >= 1) character rather than the first character in a line (the default) as the separator between the key and value. For example:

  • Sample code:

    $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -input myInputDirs \ -output myOutputDir \ -mapper org.apache.hadoop.mapred.lib.IdentityMapper \ -reducer org.apache.hadoop.mapred.lib.IdentityReducer

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top