Pregunta

Un simple reductor de conteo de palabras en Ruby se ve así:

#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end

wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end

obtiene en el STDIN todos los valores intermedios de los mapeadores. No de una clave específica. Entonces, en realidad, solo hay UN reductor para todos (y no un reductor por palabra o por conjunto de palabras).

Sin embargo, en los ejemplos de Java, vi esta interfaz que obtiene una clave y una lista de valores como inout. Lo que significa que los valores del mapa intermedio se agrupan por clave antes de reducirse y los reductores pueden ejecutarse en paralelo:

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
              int sum = 0;
              while (values.hasNext()) {
                sum += values.next().get();
              }
              output.collect(key, new IntWritable(sum));
            }
          }

¿Es esta una característica única de Java? ¿O puedo hacerlo con Hadoop Streaming usando Ruby?

¿Fue útil?

Solución

Los reductores siempre se ejecutarán en paralelo, ya sea que esté utilizando transmisión o no (si no está viendo esto, verifique que la configuración del trabajo esté configurada para permitir múltiples tareas de reducción; consulte mapred.reduce.tasks en su clúster o configuración del trabajo). La diferencia es que el framework empaqueta las cosas un poco mejor cuando usa Java versus streaming.

Para Java, la tarea de reducción obtiene un iterador sobre todos los valores para una clave en particular. Esto facilita caminar los valores si, por ejemplo, está sumando la salida del mapa en su tarea de reducción. En la transmisión, literalmente obtienes una secuencia de pares clave-valor. Tiene garantizado de que los valores se ordenarán por clave, y que para una clave determinada no se dividirá en tareas de reducción, pero cualquier seguimiento de estado que necesite depende de usted. Por ejemplo, en Java, la salida de su mapa llega simbólicamente a su reductor en la forma

clave1, {val1, val2, val3} clave2, {val7, val8}

Con la transmisión, su salida se ve como

clave1, val1 clave1, val2 clave1, val3 clave2, val7 clave2, val8

Por ejemplo, para escribir un reductor que calcule la suma de los valores para cada clave, necesitará una variable para almacenar la última clave que vio y una variable para almacenar la suma. Cada vez que lee un nuevo par clave-valor, hace lo siguiente:

  1. compruebe si la clave es diferente a la última clave.
  2. si es así, envíe su clave y la suma actual, y restablezca la suma a cero.
  3. agregue el valor actual a su suma y establezca la última clave en la clave actual.

HTH.

Otros consejos

No he probado Hadoop Streaming, pero al leer los documentos creo que puedes lograr un comportamiento paralelo similar.

En lugar de pasar una clave con los valores asociados a cada reductor, la transmisión agrupará la salida del mapeador por claves. También garantiza que los valores con las mismas claves no se dividirán en múltiples reductores. Esto es algo diferente de la funcionalidad normal de Hadoop, pero aun así, el trabajo de reducción se distribuirá entre múltiples reductores.

Intente utilizar la opción -verbose para obtener más información sobre lo que realmente está sucediendo. También puede intentar experimentar con la opción -D mapred.reduce.tasks = X donde X es el número deseado de reductores.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top