EMR Streaming job using Java code for mapper and reducer
質問
I currently have streaming jobs run with mapper and reducer code written in ruby. I want to convert these to java. I do not know how to run a streaming job with EMR hadoop using java. The sample given in amazon's EMR website of cloudburst is too complex. Following are the details of how I run the jobs currently.
Code to start a job:
elastic-mapreduce --create --alive --plain-output --master-instance-type m1.small
--slave-instance-type m1.xlarge --num-instances 2 --name "Job Name" --bootstrap-action
s3://bucket-path/bootstrap.sh
Code to add a step:
elastic-mapreduce -j <job_id> --stream --step-name "my_step_name"
--jobconf mapred.task.timeout=0 --mapper s3://bucket-path/mapper.rb
--reducer s3://bucket-path/reducerRules.rb --cache s3://bucket-path/cache/cache.txt
--input s3://bucket-path/input --output s3://bucket-path/output
Mapper code reads from a csv file which is mentioned above as EMR's cache argument as well as it reads from the input s3 bucket which also has some csv files, does some calculations and prints a csv output lines to standard output.
//mapper.rb
CSV_OPTIONS = {
// some CSV options
}
begin
file = File.open("cache.txt")
while (line = file.gets)
// do something
end
file.close
end
input = FasterCSV.new(STDIN, CSV_OPTIONS)
input.each{
// do calculations and get result
puts (result)
}
//reducer.rb
$stdin.each_line do |line|
// do some aggregations and get aggregation_result
if(some_condition) puts(aggregation_result)
end
解決 2
Since now I have a better stronghold on Hadoop and Mapreduce, here is what I had expected:
To start a cluster, the code will remain more or less same as in the question but we can add config parameters:
ruby elastic-mapreduce --create --alive --plain-output --master-instance-type m1.xlarge --slave-instance-type m1.xlarge --num-instances 11 --name "Java Pipeline" --bootstrap-action s3://elasticmapreduce/bootstrap-actions/install-ganglia --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop --args "--mapred-config-file, s3://com.versata.emr/conf/mapred-site-tuned.xml"
To add Job Steps:
Step 1:
ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-one.jar --arg s3://somepath/input-one --arg s3://somepath/output-one --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0
Step2:
ruby elastic-mapreduce --jobflow <jobflo_id> --jar s3://somepath/job-two.jar --arg s3://somepath/output-one --arg s3://somepath/output-two --args -m,mapred.min.split.size=52880 -m,mapred.task.timeout=0
Now as for the Java code, There will be one Main class which would contain one implementation each of the following classes:
- org.apache.hadoop.mapreduce.Mapper;
- org.apache.hadoop.mapreduce.Reducer;
Each of these have to override methods map() and reduce() to do the desired job.
The Java class for problem in question would look like following:
public class SomeJob extends Configured implements Tool {
private static final String JOB_NAME = "My Job";
/**
* This is Mapper.
*/
public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// Get the cached file
Path file = DistributedCache.getLocalCacheFiles(context.getConfiguration())[0];
File fileObject = new File (file.toString());
// Do whatever required with file data
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
outputKey.set("Some key calculated or derived");
outputVey.set("Some Value calculated or derived");
context.write(outputKey, outputValue);
}
}
/**
* This is Reducer.
*/
public static class ReduceJob extends Reducer<Text, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
outputKey.set("Some key calculated or derived");
outputVey.set("Some Value calculated or derived");
context.write(outputKey, outputValue);
}
}
@Override
public int run(String[] args) throws Exception {
try {
Configuration conf = getConf();
DistributedCache.addCacheFile(new URI(args[2]), conf);
Job job = new Job(conf);
job.setJarByClass(TaxonomyOverviewReportingStepOne.class);
job.setJobName(JOB_NAME);
job.setMapperClass(MapJob.class);
job.setReducerClass(ReduceJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
} catch (Exception e) {
e.printStackTrace();
return 1;
}
}
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.out
.println("Usage: SomeJob <comma sparated list of input directories> <output dir> <cache file>");
System.exit(-1);
}
int result = ToolRunner.run(new TaxonomyOverviewReportingStepOne(), args);
System.exit(result);
}
}
他のヒント
You don't use streaming if you're using java. You build a Jar directly against the MapReduce API.
Check out the examples folder of the hadoop source for some good examples of how to do this, including the infamous wordcount: https://github.com/apache/hadoop/tree/trunk/src/examples/org/apache/hadoop/examples
I'm not totally sure why you want to use Java, but coding directly to the API's can be painful. You might want to try one of the following: Java Projects:
Non-Java:
- Hive (sql-like) https://cwiki.apache.org/confluence/display/Hive/Home
- Pig http://pig.apache.org/#Getting+Started
- Scoobi (scala) https://github.com/NICTA/scoobi
FWIW I think Pig would probably be my pick, and is supported out of the box on EMR.