getting java.lang.ClassCastException: class java.lang.String in running a simple MapReduce Program

StackOverflow https://stackoverflow.com/questions/21722173

Domanda

I am trying to execute a simple MapReduce program, wherein the Map takes the input, splits it in two parts(key=> String and value=>Integer) The reducer sums up the values for a corresponding key I am getting ClassCastException everytime. I am not able to understand, what in the code is causing this error

My Code:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class Test {
public static class Map extends MapReduceBase implements
        Mapper<LongWritable, Text, String, Integer> {

    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<String, Integer> output, Reporter reporter)
            throws IOException {
        String line = value.toString();
        String[] lineParts = line.split(",");
        output.collect(lineParts[0], Integer.parseInt(lineParts[1]));

    }
}

public static class Reduce extends MapReduceBase implements
        Reducer<String, Integer, String, Integer> {

    @Override
    public void reduce(String key, Iterator<Integer> values,
            OutputCollector<String, Integer> output, Reporter reporter)
            throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum = sum + values.next();
        }
        output.collect(key, sum);
    }
}

public static void main(String[] args) throws Exception {

    JobConf conf = new JobConf(Test.class);
    conf.setJobName("ProductCount");

    conf.setMapOutputKeyClass(String.class);
    conf.setMapOutputValueClass(Integer.class);

    conf.setOutputKeyClass(String.class);
    conf.setOutputValueClass(Integer.class);

    conf.setMapperClass(Map.class);
    conf.setReducerClass(Reduce.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);

}
}

Sample Data:

abc,10
abc,10
abc,10
def,9
def,9

Following is the stack trace. Does it have anything to do with my key-value?

14/02/11 23:57:35 INFO mapred.JobClient: Task Id : attempt_201402110240_0013_m_000001_2, Status : FAILED
java.lang.ClassCastException: class java.lang.String
at java.lang.Class.asSubclass(Class.java:3018)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:816)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:382)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:324)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
at org.apache.hadoop.mapred.Child.main(Child.java:262)


Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1246)
at Test.main(Test.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:186)
È stato utile?

Soluzione

It seems to me as if you are not using the correct classes for the Output.

From one of the MapReduce Tutorials:

The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework.

Therefore you should replace String.class with Text.class and Integer.class with IntWritable.class.

I hope that fixes your problem.

Why can't I use the basic String or Integer classes?

Integer and String implement the standard Serializable-interface of Java as seen in the docs. The problem is that MapReduce serializes/deserializes values not utilizing this standard interface but rather an own interface, which is called Writable.

So why don't they just use the basic Java Interface?

Short answer: Because it is more efficient. The Writable Interface omits the type definition when serializing, because you already define the types of the input/output in your MapReduce-code. As your code already knows what's coming, instead of serializing a String like this:

String: "theStringItself"

It could be serialized like:

theStringItself

As you can see this saves an enormous amount of memory.

Long answer: Read this awesome blog post.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top