Ruby 中的简单字数缩减器如下所示:

#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end

wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end

它在 STDIN 中获取所有映射器的中间值。不是来自特定的键。所以实际上只有一个reducer适用于所有(而不是每个单词或每组单词的reducer)。

然而,在 Java 示例中,我看到这个接口以 inout 的形式获取键和值列表。这意味着中间映射值在减少之前按键分组,并且减少器可以并行运行:

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
              int sum = 0;
              while (values.hasNext()) {
                sum += values.next().get();
              }
              output.collect(key, new IntWritable(sum));
            }
          }

这是 Java 独有的功能吗?或者我可以使用 Ruby 通过 Hadoop Streaming 来实现吗?

有帮助吗?

解决方案

无论您是否使用流式处理,Reducers 都将始终并行运行(如果您没有看到这一点,请验证作业配置是否设置为允许多个 Reduce 任务 - 请参阅集群或作业配置中的 mapred.reduce.tasks )。区别在于,当您使用 Java 而不是流式处理时,框架会更好地为您打包。

对于 Java,reduce 任务获取特定键的所有值的迭代器。例如,如果您要在reduce 任务中对map 输出求和,那么就可以轻松遍历这些值。在流式传输中,您实际上只是获得键值对流。你 保证值将按键排序,并且对于给定的键不会在reduce任务之间分割,但您需要的任何状态跟踪都取决于您。例如,在 Java 中,您的映射输出以以下形式象征性地到达您的减速器

key1,{val1,val2,val3} key2,{val7,val8}

通过流式传输,您的输出看起来像

key1, val1 key1, val2 key1, val3 key2, val7 key2, val8

例如,要编写一个计算每个键的值之和的化简器,您需要一个变量来存储您看到的最后一个键,并需要一个变量来存储总和。每次读取新的键值对时,您都会执行以下操作:

  1. 检查该密钥是否与上一个密钥不同。
  2. 如果是,则输出您的密钥和当前总和,并将总和重置为零。
  3. 将当前值添加到总和中,并将最后一个键设置为当前键。

HTH。

其他提示

我还没有尝试过的Hadoop流自己,而是来自阅读,我认为可以实现类似的并行行为的文档。

不是传递密钥与相关联的值,以各减速机,通过密钥流意愿组映射器输出的。同时也保证了具有相同键的值不会被多个减速劈了过来。这是从正常的Hadoop功能有些不同,但即便如此,减少工作将在多个变径分布。

尝试使用-verbose选项来获得有关到底发生了什么的更多信息。你也可以尝试与-D mapred.reduce.tasks=X选项进行实验,其中X为减速器所需的号码。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top