I want to write my own word count example using MapReduce and hadoop v. 1.0.3 (I'm on MacOS) but i don't understand why it doesn't work Sharing my code :

main:

package org.myorg;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {
    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        // set job name, mapper, combiner, and reducer classes
        conf.setJobName("WordCount");
        // set input and output formats
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        // set input and output paths
        //FileInputFormat. setInputPaths(conf, new Path(input));
        //FileOutputFormat.setOutputPath(conf, new Path(output));
        FileOutputFormat.setCompressOutput(conf, false);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(org.myorg.Map.class);
        conf.setReducerClass(org.myorg.Reduce.class);
        String host = args[0];
        String input = host + "/" + args[1];
        String output = host + "/" + args[2];
        // set input and output paths
        FileInputFormat.addInputPath(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));
        JobClient j=new JobClient(conf);
        (j.submitJob(conf)).waitForCompletion();
    }
}

Mapper:

package org.myorg;

import java.io.IOException;
import java.util.HashMap;
import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.Vector;
import java.util.Map.Entry;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper.Context;

public class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>  {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(LongWritable key, Text value,
    OutputCollector<Text, IntWritable> output, Reporter reporter)
    throws IOException {

        MapWritable hs = new MapWritable();
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            //hs.put(word, one);
            output.collect(word,one);
        }
        // TODO Auto-generated method stub
    }
}

Reducer:

package org.myorg;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.math.RoundingMode;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.Vector;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Reducer.Context;


//public class Reduce extends MapReduceBase implements Reducer<Text, MapWritable, Text, Text> {
public class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, Text> {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        String host = "hdfs://localhost:54310/";
        String tmp = host + "Temporany/output.txt";
        FileSystem srcFS;
        try {
            srcFS = FileSystem.get(new URI(tmp), new JobConf());
            srcFS.delete(new Path(tmp), true);
            BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(
            srcFS.create(new Path(tmp))));
            wr.write(key.toString() + ":" + sum);
            wr.close();
            srcFS.close();
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        //   context.write(key, new IntWritable(sum));
    }
}

The Job started and end with no error but didn't write the output file. I launch the jar with Hadoop with this command:

./Hadoop jar /Users/User/Desktop/hadoop/wordcount.jar hdfs://localhost:54310 /In/testo.txt /Out/wordcount17

This is the output:

2014-03-03 17:56:22.063 java[6365:1203] Unable to load realm info from SCDynamicStore
14/03/03 17:56:22 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/03/03 17:56:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/03/03 17:56:23 WARN snappy.LoadSnappy: Snappy native library not loaded
14/03/03 17:56:23 INFO mapred.FileInputFormat: Total input paths to process : 1

I suppose the problem is "unable to load native-hadoop library" but works fine for other Jar's.

有帮助吗?

解决方案

Q : The Job start and end with no error but don't write the output-file ??

Ans : I am not sure that job successfully ends w/o error .

Problems :

  • Job Configuration:

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(IntWritable.class);
    

The setOutputKeyClass() and setOutputValueClass() methods control the output types for the map and the reduce functions, which are often the same. If they are different, then the map output types can be set using the methods setMapOutputKeyClass() and setMapOutputValueClass().

The Output class in your case is :

Map key : Text
Map Value : IntWritable 
Reduce key : Text 
Reduce Value : Text

Which will result in Type mismatch exception

  • Reduce

I am not sure why you are using hdfs API to write your output to a file ?.

Should use output.collect(key,value).

In case of multiple reducer are you handling the simultaneous write operation? And I wonder what context.write is doing in old apis (it's commented )? .


You can use following for more information

Debug your map-reduce Job :


Q. Difference b/w SubmitJob() && waitForCompletion() ?

Ans : SubmitJob() : submits the job and ends .

waitForCompletion() : submits the job and prints the status of the job on console . So waitForCompletion() is SubmitJob()+status update of job until complete.


word count

Please read

  • Map Reduce Apache
  • You can also find hadoop-examples-X.X.X.jar in your installation folder.
  • Go through $HADOOP_HOME/src/expalmes/ for source code .

**$HADOOP_HOME = hadoop installation folder

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top