Question

All of my programs are writing with hadoop's new MR1 interfaces (org.apache.hadoop.mapreduce), so I want to use new org.apache.avro.mapreduce of avro too. But it doesn't work for me.

The program takes input of avro data and output the same. The main idea behind my program is subclassing hadoop's Mapper and Reducer against avro wrapped key/value. Here is a block of my job driver :

    AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
    AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());

    job.setMapperClass(MyAvroMap.class);
    job.setReducerClass(MyAvroReduce.class);

    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);

    job.setMapOutputKeyClass(AvroKey.class);
    job.setMapOutputValueClass(AvroValue.class);

    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWritable.class);

The definitions of MyAvroMap and MyAvroReduce subclasses respectivly are

public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
            AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }

public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>, 
                AvroKey<NetflowRecord>, NullWritable>{ ... }

The methioned NetflowRecord is my avro record class. And I got running exception

java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey

By reading hadoop's and avro's source code, I found that the exception was thrown by JobConf to make sure the map key is a subclass of WritableComparable, like this (hadoop1.2.1, line759)

WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));

But the avro shows that AvroKey and AvroValue are just a simple wrapper without subclassing Writable* interfaces of hadoop.

I believe that, even without testing, I can get through using old mapred interfaces, but its not what I want. Can u give me some examples or explaination about programming with pure org.apache.avro.mapreduce interfaces??

Sincerely,

Jamin

Was it helpful?

Solution

After hard searching, with the help of this patch https://issues.apache.org/jira/browse/AVRO-593, I figure out that each AvroKey and AvroValue wrapper must have a schema definition in the job configuration. That's what I missed.

Here I have two options:

  1. If remaining the MyAvroMap and MyAvroReduce unchanged, I have to define a schema for CharSequence and declare this schema with AvroJob for Mapper output, such as

    AvroJob.setMapOutputKeySchema(job, <"defined-schema-for-charsequence">); AvroJob.setMapOutputValueSchema(job, NetflowRecord.getClassSchema());

  2. By changing the Mapper output key/value as Text/AvroValue, I need only to add schema declaration for Mapper output value, like

    job.setMapOutputKeyClass(Text.class); AvroJob.setMapOutputValueSchema(job, NetflowRecord.getClassSchema());

With the mapreduce API, we do not require to subclass AvroMapper and AvroReducer any more. Here, I implement option2 without addtional schema definition in my code.

Jamin

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top