Domanda

Un semplice riduttore di conteggio parole in Ruby è simile al seguente:

#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end

wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end

ottiene nello STDIN tutti i valori intermedi dei mappatori. Non da una chiave specifica. Quindi in realtà esiste solo UN riduttore per tutti (e non riduttore per parola o per set di parole).

Tuttavia, negli esempi Java ho visto questa interfaccia che ottiene una chiave e un elenco di valori come inout. Ciò significa che i valori delle mappe intermedie vengono raggruppati per chiave prima di essere ridotti e i riduttori possono funzionare in parallelo:

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
              int sum = 0;
              while (values.hasNext()) {
                sum += values.next().get();
              }
              output.collect(key, new IntWritable(sum));
            }
          }

È solo una funzionalità Java? O posso farlo con Hadoop Streaming usando Ruby?

È stato utile?

Soluzione

I riduttori funzioneranno sempre in parallelo, indipendentemente dal fatto che tu stia utilizzando lo streaming o meno (se non lo vedi, verifica che la configurazione del lavoro sia impostata per consentire più attività di riduzione - vedi mapred.reduce.tasks nel tuo cluster o configurazione del lavoro). La differenza è che il framework impacchetta le cose un po 'più bene quando usi Java rispetto allo streaming.

Per Java, l'attività di riduzione ottiene un iteratore su tutti i valori per una chiave particolare. Ciò semplifica la camminata dei valori se, ad esempio, sommi l'output della mappa nell'attività di riduzione. Nello streaming, ottieni letteralmente un flusso di coppie chiave-valore. sei garantito che i valori verranno ordinati in base alla chiave e che per una determinata chiave non verrà suddivisa tra le attività di riduzione, ma qualsiasi tracciamento dello stato di cui hai bisogno dipende da te. Ad esempio, in Java l'output della tua mappa arriva simbolicamente al tuo riduttore nella forma

chiave1, {val1, val2, val3} chiave2, {val7, val8}

Con lo streaming, l'output appare invece

chiave1, val1 chiave1, val2 chiave1, val3 chiave2, val7 key2, val8

Ad esempio, per scrivere un riduttore che calcola la somma dei valori per ogni chiave, avrai bisogno di una variabile per memorizzare l'ultima chiave che hai visto e una variabile per memorizzare la somma. Ogni volta che leggi una nuova coppia chiave-valore, esegui le seguenti operazioni:

  1. controlla se la chiave è diversa dall'ultima chiave.
  2. in tal caso, genera la chiave e la somma corrente e reimposta la somma a zero.
  3. aggiungi il valore corrente alla somma e imposta l'ultima chiave sulla chiave corrente.

HTH.

Altri suggerimenti

Non ho provato Hadoop Streaming me stesso, ma dalla lettura dei documenti penso che tu possa ottenere un comportamento parallelo simile.

Invece di passare una chiave con i valori associati a ciascun riduttore, lo streaming raggrupperà l'output del mapper in base alle chiavi. Garantisce inoltre che i valori con le stesse chiavi non vengano suddivisi su più riduttori. Questo è in qualche modo diverso dalla normale funzionalità di Hadoop, ma anche così, il lavoro di riduzione sarà distribuito su più riduttori.

Prova a utilizzare l'opzione -verbose per ottenere maggiori informazioni su ciò che sta realmente accadendo. Puoi anche provare a sperimentare l'opzione -D mapred.reduce.tasks = X dove X è il numero desiderato di riduttori.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top