Question

Getting the following error when I try to run mapreduce on avro:

14/02/26 20:07:50 INFO mapreduce.Job: Task Id : attempt_1393424169778_0002_m_000001_0, Status : FAILED
Error: org.apache.avro.generic.GenericData.createDatumWriter(Lorg/apache/avro/Schema;)Lorg/apache/avro/io/DatumWriter;

How can I fix this?

I have Hadoop 2.2 up and running.
I'm using Avro 1.7.6.

Below is the code:

package avroColorCount;
import java.io.IOException;

import org.apache.avro.*;
import org.apache.avro.Schema.Type;
import org.apache.avro.mapred.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class MapredColorCount extends Configured implements Tool {

  public static class ColorCountMapper extends AvroMapper<User, Pair<CharSequence, Integer>> {
    @Override
    public void map(User user, AvroCollector<Pair<CharSequence, Integer>> collector, Reporter reporter)
        throws IOException {
      CharSequence color = user.getFavoriteColor();
      // We need this check because the User.favorite_color field has type ["string", "null"]
      if (color == null) {
        color = "none";
      }
      collector.collect(new Pair<CharSequence, Integer>(color, 1));
    }
  }

  public static class ColorCountReducer extends AvroReducer<CharSequence, Integer,
                                                            Pair<CharSequence, Integer>> {
    @Override
    public void reduce(CharSequence key, Iterable<Integer> values,
                       AvroCollector<Pair<CharSequence, Integer>> collector,
                       Reporter reporter)
        throws IOException {
      int sum = 0;
      for (Integer value : values) {
        sum += value;
      }
      collector.collect(new Pair<CharSequence, Integer>(key, sum));
    }
  }

  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: MapredColorCount <input path> <output path>");
      return -1;
    }

    JobConf conf = new JobConf(getConf(), MapredColorCount.class);
    conf.setJobName("colorcount");

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    AvroJob.setMapperClass(conf, ColorCountMapper.class);
    AvroJob.setReducerClass(conf, ColorCountReducer.class);

    // Note that AvroJob.setInputSchema and AvroJob.setOutputSchema set
    // relevant config options such as input/output format, map output
    // classes, and output key class.
    AvroJob.setInputSchema(conf, User.getClassSchema());
    AvroJob.setOutputSchema(conf, Pair.getPairSchema(Schema.create(Type.STRING),
        Schema.create(Type.INT)));

    JobClient.runJob(conf);
    return 0;
  }

  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new MapredColorCount(), args);
    System.exit(res);
  }
}
Was it helpful?

Solution

You're using wrong version of avro library. createDatumWriter method first appeared in GenericData class in version 1.7.5 of avro library. If Hadoop does not seem to find it, then it means that there is an earlier version of avro library (possibly 1.7.4) in your classpath.

First try to provide a correct version of library with HADOOP_CLASSPATH or -libjars option.

Unfortunately, it may be more tricky. In my case it was some other jar file that I loaded with my project but actually never used. I spent several weeks do find it. Hope now you will find it quicker.

Here is some handy code to help you analyze your classpath during your job run (use it inside working job, like WordCount example):

public static void printClassPath() {
    ClassLoader cl = ClassLoader.getSystemClassLoader();
        URL[] urls = ((URLClassLoader) cl).getURLs();
    System.out.println("classpath BEGIN");
    for (URL url : urls) {
        System.out.println(url.getFile());
    }
    System.out.println("classpath END");
}

Hope it helps.

OTHER TIPS

Viacheslav Rodionov's answer definitely points to the root cause. Thank you for posting! The following configuration setting then seemed to pick up the 1.7.6 library first and allowed my reducer code (where the createDatumWriter method was called) to complete successfully:

Configuration conf = getConf();
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
Job job = Job.getInstance(conf);

I ran exactly into the same problem and as Viacheslav suggested -- it's a version conflict between Avro installed with Hadoop distribution, and Avro version in your project.

And it seems the most reliable way to solve the problem -- simply just use Avro version installed with your Hadoop distro. Unless there is compelling reason to use different version.

Why is using default Avro version which comes with Hadoop distribution is good idea? Because in production hadoop environment you most likely will deal numerous other jobs and services running on the same shared hadoop infrastructure. And the all share the same jar dependencies which come with Hadoop distribution installed in your production environment.
Replacing jar version for specific mapreduce job maybe tricky but solvable task. However it creates a risk of introducing compatibility problem which may be very hard to detect and can backfire later somewhere else in your hadoop ecosystem.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top