Вопрос

Here is the error I receive:

    14/02/28 02:52:43 INFO mapred.JobClient: Task Id : attempt_201402271927_0020_m_000001_2, Status : FAILED
java.lang.NullPointerException
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:843)
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:376)
    at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:85)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:584)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:656)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapred.Child.main(Child.java:262)

I've commented my code out to essentially take in the typical LongWritable and Text and then I just output a constant IntWritable 1 and an empty weather class (custom class):

Here is my mapper class:

public class Map extends Mapper<LongWritable, Text, IntWritable, Weather> {

private IntWritable id = new IntWritable(1);
private Weather we = new Weather();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //String s;
    //String line = value.toString();

    //int start[] =   {0,18,31,42,53,64,74,84,88,103};
    //int end[] =     {6,22,33,44,55,66,76,86,93,108};

    //if(line.length() > 108) {
        // create the object to hold our data
        // getStuff()
        // parse the string

        // push the object onto our data structure
        context.write(id, we);
    //}
}

Here is my reducer:

public class Reduce extends Reducer<IntWritable, Weather, IntWritable, Text> {
    private Text text = new Text("one");
    private IntWritable one = new IntWritable(1);
    public void reduce(IntWritable key, Iterable<Weather> weather, Context context)
        throws IOException, InterruptedException {
        //for(Weather w : weather) {
        //    text.set(w.toString());
        context.write(one, text);
    }
}

Here is my main:

public class Skyline {

    public static void main(String[] args) throws IOException{
        //String s = args[0].length() > 0 ? args[0] : "skyline.in";
        Path input, output;
        Configuration conf = new Configuration();

        conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
                + "org.apache.hadoop.io.serializer.WritableSerialization");
        try {
            input = new Path(args[0]);
        } catch(ArrayIndexOutOfBoundsException e) {
            input = new Path("hdfs://localhost/user/cloudera/in/skyline.in");
        }
        try {
            output = new Path(args[1]);
            //FileSystem.getLocal(conf).delete(output, true);
        } catch(ArrayIndexOutOfBoundsException e) {
            output = new Path("hdfs://localhost/user/cloudera/out/");
            //FileSystem.getLocal(conf).delete(output, true);
        }

        Job job = new Job(conf, "skyline");

        job.setJarByClass(Skyline.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Weather.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, input);
        FileOutputFormat.setOutputPath(job, output);
        try {
            job.waitForCompletion(true);
        } catch(InterruptedException e) {
            System.out.println("Interrupted Exception");
        } catch(ClassNotFoundException e) {
            System.out.println("ClassNotFoundException");
        }
    }
}

here is a sample of my Weather class:

public class Weather {

private in stationId;

public Weather(){}

public int getStation(){return this.stationID;}
public void setStation(int r){this.stationID = r}
//...24 additional things of ints, doubles and strings
}

I'm at my wits end. At this point I have a shell of a program that does nothing and am still receiving the error. I've read up on Java Generics, to make sure I'm using them correctly (I think I am), I'm very green to the MapReduce paradigm, but this program is just a shell, modified from the MapReduce tutorial (https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Walk-through).

Это было полезно?

Решение

The problem is that the class that you're using for map() output / reduce() input, Weather does not implement Writable. This will prevent the default SerializationFactory from being able to process your values.

The underlying conceptual problem is that Hadoop does not know how to serialize your data type to disc and read it back. That is a mandatory step, because the data has to be persisted before it can be moved from the map task to a reducer (the two can run on separate nodes, in general).

So what you want to do is implement Writable and add serialization routines in your custom data type.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top