Parallelisierung Ruby-Reduzierungen in Hadoop?
Frage
Ein einfaches wordcount Minderer in Ruby sieht wie folgt aus:
#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end
wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end
es wird in der STDIN all Mapper Zwischenwerte. Nicht von einem bestimmten Schlüssel. Also eigentlich gibt es nur eine Reduzierung für alle (und nicht pro Wort Minderer oder pro Gruppe von Wörtern).
Doch auf Java Beispiele habe ich diesen Schnittstelle, die einen Schlüssel und Werteliste als inout bekommt. Was bedeutet, intermidiate Kartenwerte werden durch Schlüssel gruppiert, bevor reduziert und Reduzierungen parallel ausgeführt werden können:
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
Ist das ein Java nur Feature? Oder kann ich es mit Hadoop Streaming Rubin mit?
Lösung
Reduzierungen werden immer parallel laufen, egal ob Sie Streaming verwenden oder nicht (wenn Sie dies nicht zu sehen sind, stellen Sie sicher, dass die Job-Konfiguration mehr reduzieren Aufgaben ermöglichen eingestellt ist - siehe mapred.reduce.tasks im Cluster oder Job-Konfiguration). Der Unterschied besteht darin, dass die Rahmenpakete Dinge ein wenig mehr schön für Sie, wenn Sie Java im Vergleich zu Streaming verwenden.
Für Java wird die Aufgabe reduzieren einen Iterator über alle Werte für einen bestimmten Schlüssel. Dies macht es leicht, die Werte zu gehen, wenn Sie sind, sagen wir, die Karte Ausgabe in Ihrer reduzieren Aufgabe Summieren. In Streaming erhalten Sie buchstäblich nur einen Strom von Schlüssel-Wert-Paaren. Sie sind garantiert, dass die Werte von Schlüssel bestellt, und dass für einen bestimmten Schlüssel nicht über Aufgaben reduzieren aufgeteilt werden, aber jeder Staat Tracking Sie brauchen, ist Ihnen überlassen. Zum Beispiel in Java Ihre Karte Ausgabe kommt zu dem Minderer symbolisch in Form
key1, {val1, val2, val3} key2, {val7, Val8}
Mit Streaming, Ihre Ausgabe sieht statt wie
key1, val1 key1, val2 key1, val3 key2, val7 key2, Val8
Um zum Beispiel einen Druckminderer zu schreiben, die die Summe der Werte für jeden Schlüssel berechnet, werden Sie eine Variable benötigen den letzten Schlüssel speichern Sie sahen und eine Variable, die Summe zu speichern. Jedes Mal, wenn Sie ein neues Schlüssel-Wert-Paar zu lesen, haben Sie folgende Möglichkeiten:
- prüfen, ob der Schlüssel als der letzte Schlüssel unterscheidet.
- wenn ja, Ausgabe Ihres Schlüssel und aktuelle Summe, und setzen Sie die Summe auf Null zurück.
- fügen Sie den aktuellen Wert Ihre Summe und letzten Schlüssel zum aktuellen Schlüssel festgelegt.
HTH.
Andere Tipps
Ich habe nicht Hadoop Streaming selbst versucht, aber in der Dokumentenlese Ich glaube, Sie ähnliche Parallelverhalten erreichen kann.
Statt einen Schlüssel mit den zugehörigen Werten zu jedem Minderer vorbei, Streaming Willen Gruppe der Mapper Ausgabe von Schlüsseln. Es gewährleistet auch, dass die Werte mit den gleichen Tasten nicht mehr Reduzierungen uneins über werden. Dies ist etwas anders als normale Hadoop-Funktionalität, aber auch so wird die Verringerung der Arbeit über mehrere Reduzierungen verteilt werden.
Versuchen Sie, die -verbose
Option zu verwenden, um mehr Informationen zu bekommen, was wirklich vor sich geht. Sie können auch versuchen, mit der -D mapred.reduce.tasks=X
Option zu experimentieren, wobei X die gewünschte Anzahl von Reduzierungen ist.