Pergunta

I'm trying to run a hadoop streaming job with a java Mapper/Reducer over some wikipedia dumps (in compressed bz2 form). I'm trying to use WikiHadoop, which is an interface released by Wikimedia recently.

WikiReader_Mapper.java

package courseproj.example;

// Mapper: emits (token, 1) for every article occurrence.
public class WikiReader_Mapper extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> {

    // Reuse objects to save overhead of object creation.
    private final static Text KEY = new Text();
    private final static IntWritable VALUE = new IntWritable(1);

    @Override
    public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
            throws IOException {
        KEY.set("article count");
        collector.collect(KEY, VALUE);
    }
}

WikiReader_Reducer.java

package courseproj.example;

//Reducer: sums up all the counts.
public class WikiReader_Reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    private final static IntWritable SUM = new IntWritable();

    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector,
            Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        SUM.set(sum);
        collector.collect(key, SUM);
    }
}

The command I'm running is

hadoop jar lib/hadoop-streaming-2.0.0-cdh4.2.0.jar \
       -libjars lib2/wikihadoop-0.2.jar \
       -D mapreduce.input.fileinputformat.split.minsize=300000000 \
       -D mapreduce.task.timeout=6000000 \
       -D org.wikimedia.wikihadoop.previousRevision=false \
       -input enwiki-latest-pages-articles10.xml-p000925001p001325000.bz2 \
       -output out \
       -inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \
       -mapper WikiReader_Mapper \
       -reducer WikiReader_Reducer

and the error messages I'm getting are

Error: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:424)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:157)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:152)

Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:103)

Caused by: java.io.IOException: Cannot run program "WikiReader_Mapper": java.io.IOException: error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:460)
    at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209)

I'm more familiar with the new hadoop API vs. old. Since my mapper and reducer code is in two different files, where do I define the JobConf configuration parameters for the job while at the same time, following the command structure of hadoop streaming (explicitly setting the mapper and reducer class). Is there a way I can wrap the mapper and reducer code all up into one class (that extends Configured and implements Tool, which is what's done in the new API) and pass the class name to the hadoop streaming command line vs. setting the the map and reduce classes separately?

Foi útil?

Solução

Streaming uses the old API (org.apache.hadoop.mapred) - yet your mapper and reducer classes extend the new API classes (org.apache.hadoop.mapreduce).

Try changing your mapper to implement org.apache.hadoop.mapred.Mapper, and reducer to implement org.apache.hadoop.mapred.Reducer, for example:

package courseproj.example;

// Mapper: emits ("article", 1) for every article occurrence.
public class WikiReader_Mapper implements Mapper<Text, Text, Text, IntWritable> {

  // Reuse objects to save overhead of object creation.
  private final static Text KEY = new Text();
  private final static IntWritable VALUE = new IntWritable(1);

  @Override
  public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter)
      throws IOException, InterruptedException {
    KEY.set("article count");
    collector.collect(KEY, VALUE);
  }
}
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top