HadoopでRubyレデューサーを並列化していますか?
質問
Rubyの単純な単語数削減機能は次のようになります。
#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end
wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end
すべてのマッパーの中間値を標準入力にします。特定のキーからではありません。 そのため、実際にはすべてのリデューサーは1つだけです(ワードごとまたはワードセットごとのリデューサーはありません)。
ただし、Javaの例では、キーと値のリストをinoutとして取得するこのインターフェイスを見ました。つまり、中間マップの値は、削減される前にキーごとにグループ化され、レデューサーは並行して実行できます。
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
これはJavaのみの機能ですか?または、Rubyを使用してHadoopストリーミングでそれを実行できますか?
解決
ストリーミングを使用しているかどうかにかかわらず、Reducerは常に並行して実行されます(表示されない場合は、ジョブ構成が複数のリデュースタスクを許可するように設定されていることを確認してください-クラスター内のmapred.reduce.tasksを参照)またはジョブ構成)。違いは、Javaを使用する場合とストリーミングを使用する場合に、フレームワークがパッケージ化を少しうまく行うことです。
Javaの場合、reduceタスクは特定のキーのすべての値の反復子を取得します。これにより、たとえば、reduceタスクでマップ出力を合計する場合に、値を簡単に調べることができます。ストリーミングでは、文字通りキーと値のペアのストリームを取得します。値はキーによって順序付けられ、特定のキーのリデュースタスク間で分割されないことが保証されますが、必要な状態の追跡はユーザー次第です。 たとえば、Javaでは、マップ出力は次の形式でレデューサーにシンボリックに送信されます
key1、{val1、val2、val3} key2、{val7、val8}
ストリーミングでは、代わりに出力は次のようになります
key1、val1 key1、val2 key1、val3 key2、val7 key2、val8
たとえば、各キーの値の合計を計算するレデューサーを作成するには、最後に見たキーを保存する変数と、合計を保存する変数が必要です。新しいキーと値のペアを読み取るたびに、次の操作を実行します。
- キーが最後のキーと異なるかどうかを確認します。
- その場合、キーと現在の合計を出力し、合計をゼロにリセットします。
- 現在の値を合計に追加し、最後のキーを現在のキーに設定します。
HTH。
他のヒント
自分でHadoop Streamingを試したことはありませんが、ドキュメントを読むことで、同様の並列動作を実現できると思います。
関連付けられた値を持つキーを各レデューサーに渡す代わりに、ストリーミングはキーごとにマッパー出力をグループ化します。また、同じキーを持つ値が複数のレデューサーに分割されないことも保証します。これは通常のHadoop機能とは多少異なりますが、それでも、reduce作業は複数のreducerに分散されます。
-verbose
オプションを使用して、実際に何が起こっているかについての詳細情報を取得してください。また、 -D mapred.reduce.tasks = X
オプションを試してみることができます。ここで、Xはリデューサーの希望数です。