Avro Map-Reduce on oozie
-
14-06-2021 - |
题
I have been trying to run a Avro map-reduce on oozie. I specify the mapper and reducer class in the workflow.xml and provide other configs too. But it gives out an
java.lang.RunTime Exception - class mr.sales.avro.etl.SalesMapper not org.apache.hadoop.mapred.Mapper
The same job when run directly on a hadoop cluster (and not via oozie) gets completed and gives the desired output. So it seems probable that I may be missing some oozie config. What I guess from the exception is that oozie requires the mapper to be a subclass of org.apache.hadoop.mapred.Mapper
but Avro mappers have a different signature - they extend org.apache.avro.mapred.AvroMapper and this may be reason for the error.
So my question is how do I confiure oozie workflow/properties file to allow it to run an Avro map-reduce job.
解决方案
With AVRO, you'll need to configure a few extra properties:
org.apache.avro.mapred.HadoopMapper
is the actual mapper class you need to set (this implements the Mapper interface)avro.mapper
property should name yourSalesMapper
class
There are other properties for the combiner and reducer too - check the AvroJob source and the utility methods.
Another way of doing this is to examine the job.xml from a job you manually submitted, and copy over the relevant configuration properties to your oozie workflow.xml
其他提示
I've been having the same problem this week. Here is my workflow.xml (modified):
<workflow-app name='sample-wf' xmlns="uri:oozie:workflow:0.2">
<start to='start_here'/>
<action name='start_here'>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/output"/>
</prepare>
<configuration>
<property>
<name>mapred.input.dir</name>
<value>/user/${wf:user()}/input</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/user/${wf:user()}/output</value>
</property>
<property>
<name>mapred.mapper.class</name>
<value>org.apache.avro.mapred.HadoopMapper</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.apache.avro.mapred.HadoopReducer</value>
</property>
<property>
<name>avro.mapper</name>
<value>package.for.my.Mapper</value>
</property>
<property>
<name>avro.reducer</name>
<value>package.for.my.Reducer</value>
</property>
<property>
<name>mapred.input.format.class</name>
<value>org.apache.avro.mapred.AvroUtf8InputFormat</value>
</property>
<property>
<name>mapred.output.format.class</name>
<value>org.apache.avro.mapred.AvroOutputFormat</value>
</property>
<property>
<name>mapred.output.key.class</name>
<value>org.apache.avro.mapred.AvroWrapper</value>
</property>
<property>
<name>mapred.mapoutput.key.class</name>
<value>org.apache.avro.mapred.AvroKey</value>
</property>
<property>
<name>mapred.mapoutput.value.class</name>
<value>org.apache.avro.mapred.AvroValue</value>
</property>
<property>
<name>avro.map.output.schema</name>
<value>{put your schema here from job.xml via manual run}</value>
</property>
<property>
<name>avro.input.schema</name>
<value>"string"</value>
</property>
<property>
<name>avro.output.schema</name>
<value>{put your schema here from job.xml via manual run}</value>
</property>
<property>
<name>mapred.output.key.comparator.class</name>
<value>org.apache.avro.mapred.AvroKeyComparator</value>
</property>
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization</value>
</property>
</configuration>
</map-reduce>
<ok to='end'/>
<error to='fail'/>
</action>
<kill name='fail'>
<message>MapReduce failed, error message[$sf:errorMessage(sf:lastErrorNode())}]</message>
</kill>
<end name='end'/>
You may need to modify this a bit more depending on the inputs and outputs of your map-reduce job.
Can you post your mapper and reducer classes also? My oozie workflow is working fine, but the o/p file is not .avro file. Here is my workflow:
<workflow-app name='sample-wf' xmlns="uri:oozie:workflow:0.2">
<start to='start_here'/>
<action name='start_here'>
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/hadoop/${workFlowRoot}/final-output-data"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.reducer.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
<value>true</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>/user/hadoop/${workFlowRoot}/input-data</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/user/hadoop/${workFlowRoot}/final-output-data</value>
</property>
<property>
<name>mapreduce.mapper.class</name>
<value>org.apache.avro.mapred.HadoopMapper</value>
</property>
<property>
<name>mapreduce.reducer.class</name>
<value>org.apache.avro.mapred.HadoopReducer</value>
</property>
<property>
<name>avro.mapper</name>
<value>com.flipkart.flap.data.batch.mapred.TestAvro$CFDetectionMapper</value>
</property>
<property>
<name>avro.reducer</name>
<value>com.flipkart.flap.data.batch.mapred.TestAvro$CFDetectionReducer</value>
</property>
<property>
<name>mapreduce.input.format.class</name>
<value>org.apache.avro.mapreduce.AvroKeyInputFormat</value>
</property>
<property>
<name>avro.schema.input.key</name>
<value>{... schema ...}</value>
</property>
<property>
<name>mapreduce.mapoutput.key.class</name>
<value>org.apache.hadoop.io.AvroKey</value>
</property>
<property>
<name>avro.map.output.schema.key</name>
<value>{... schema ...}</value>
</property>
<property>
<name>mapreduce.mapoutput.value.class</name>
<value>org.apache.hadoop.io.Text</value>
</property>
<property>
<name>mapreduce.output.format.class</name>
<value>org.apache.avro.mapred.AvroKeyValueOutputFormat</value>
</property>
<property>
<name>mapreduce.output.key.class</name>
<value>org.apache.avro.mapred.AvroKey</value>
</property>
<property>
<name>mapreduce.output.value.class</name>
<value>org.apache.avro.mapred.AvroValue</value>
</property>
<property>
<name>avro.schema.output.key</name>
<value>{ .... schema .... }</value>
</property>
<property>
<name>avro.schema.output.value</name>
<value>"string"</value>
</property>
<property>
<name>mapreduce.output.key.comparator.class</name>
<value>org.apache.avro.mapred.AvroKeyComparator</value>
</property>
<property>
<name>io.serializations</name>
<value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization
</value>
</property>
</configuration>
</map-reduce>
<ok to='end'/>
<error to='fail'/>
</action>
<kill name='fail'>
<message>MapReduce failed, error message[$sf:errorMessage(sf:lastErrorNode())}]</message>
</kill>
<end name='end'/>
</workflow-app>
And my mapper and reducer are defined like these :
public static class CFDetectionMapper extends
Mapper<AvroKey<AdClickFraudSignalsEntity>, NullWritable, AvroKey<AdClickFraudSignalsEntity>, Text> {}
public static class CFDetectionReducer extends
Reducer<AvroKey<AdClickFraudSignalsEntity>, Text, AvroKey<AdClickFraudSignalsEntity>, AvroValue<CharSequence>>