Question

I'm trying to test a simple MapReduce project using MRUnit. I set the input for the mapDriver and then call mapDriver.runTest() (I've also tried with mapDriver.run() but produces the same error).

I have written a custom key which overloads the write(DataOutput out), readFields(DataInput in) and compareTo(...) Methods. When debugging, the Key correctly writes its data using write(DataOutput out). However, after the key's readFields(DataInput in) method (which correctly retrieves the data that was previously written using write(DataOutput out)) finishes, the error below is thrown.

I have searched on here for similar posts and have tried overriding the hashCode() and equals() methods to no avail. Does MRUnit require any additional methods to be overriden when using custom keys? This post is most similar to MRUnit with Avro NullPointerException in Serialization. However, I am not using avro, and as far as I am aware, I am using default serialization. Cheers!

java.lang.NullPointerException
at org.apache.hadoop.mrunit.Serialization.copy(Serialization.java:61)
at org.apache.hadoop.mrunit.Serialization.copy(Serialization.java:81)
at org.apache.hadoop.mrunit.mapreduce.mock.MockContextWrapper$4.answer(MockContextWrapper.java:78)
at org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:31)
at org.mockito.internal.MockHandler.handle(MockHandler.java:97)
at org.mockito.internal.creation.MethodInterceptorFilter.intercept(MethodInterceptorFilter.java:47)
at org.apache.hadoop.mapreduce.Mapper$Context$$EnhancerByMockitoWithCGLIB$$f555e120.write(<generated>)
at model.RMSEEvaluation$Mapper.map(RMSEEvaluation.java:57)
at model.RMSEEvaluation$Mapper.map(RMSEEvaluation.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mrunit.mapreduce.MapDriver.run(MapDriver.java:221)
at org.apache.hadoop.mrunit.MapDriverBase.runTest(MapDriverBase.java:150)
at org.apache.hadoop.mrunit.TestDriver.runTest(TestDriver.java:137)
at test.TestRMSEEvaluation.testSetValues(TestRMSEEvaluation.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at junit.framework.TestCase.runTest(TestCase.java:168)
at junit.framework.TestCase.runBare(TestCase.java:134)
at junit.framework.TestResult$1.protect(TestResult.java:110)
at junit.framework.TestResult.runProtected(TestResult.java:128)
at junit.framework.TestResult.run(TestResult.java:113)
at junit.framework.TestCase.run(TestCase.java:124)
at junit.framework.TestSuite.runTest(TestSuite.java:243)
at junit.framework.TestSuite.run(TestSuite.java:238)
at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:83)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Was it helpful?

Solution

I have found a solution to this error. The error was because the type of serialization had not been set in the Configuration for MapDriver mapDriver. I had to explicitly set the serialization using the following:

Configuration conf = new Configuration();
conf.set("io.serializations","org.apache.hadoop.io.serializer.JavaSerialization," 
            + "org.apache.hadoop.io.serializer.WritableSerialization");
mapDriver.setConfiguration(conf);

Hope this helps anyone with a similar problem!

OTHER TIPS

First it's worth to test whether the serialization / deserialization works really as expected.
Without knowing how you wrote the test, the following simple one works well with MRUnit 0.9.0-incubating and JUnit 4.10 :

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.Assert;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

public class TestCustom {

    private MapDriver<CustomRecord, Text, CustomRecord, Text> mapDriver;

    private Mapper<CustomRecord, Text, CustomRecord, Text> map = 
        new Mapper<CustomRecord, Text, CustomRecord, Text>();

    private Reducer<CustomRecord, Text, CustomRecord, Text> reduce = 
        new Reducer<CustomRecord, Text, CustomRecord, Text>();

    private ReduceDriver<CustomRecord, Text, CustomRecord, Text> reduceDriver
        = ReduceDriver.newReduceDriver(reduce);

    private MapReduceDriver<CustomRecord, Text, CustomRecord, 
      Text, CustomRecord, Text> mapReduceDriver;

    private Configuration conf = new Configuration();

    //test data
    private Pair<CustomRecord, Text> data;

    //shuffled and sorted data
    private static List<Pair<CustomRecord, List<Text>>> shuffledData; 

    @Before
    public void init() {

        mapDriver = MapDriver.newMapDriver(map);
        mapReduceDriver = MapReduceDriver.newMapReduceDriver(map, reduce);
        mapDriver.withConfiguration(conf);
        initData();
    }

    private void initData() {

        CustomRecord key = new CustomRecord("first", 1);
        Text value = new Text("key1");
        data = new Pair<CustomRecord, Text>(key, value);
    }

    @Test
    public void testMapper() throws IOException {

        mapDriver.withInput(data);
        //expected output result
        mapDriver.withOutput(data); 
        mapDriver.runTest(true);

        //shuffle and sort
        List<Pair<CustomRecord, Text>> pairs = 
          new ArrayList<Pair<CustomRecord, Text>>();
        pairs.add(data);

        shuffledData = mapReduceDriver.shuffle(pairs);

    }

    @Test
    public void testReducer() throws IOException {

        // feed input to one single reduce call
        Pair<CustomRecord, List<Text>> pair = shuffledData.get(0);
        reduceDriver.withInput(pair.getFirst(), pair.getSecond());

        //reducer's output
        List<Pair<CustomRecord, Text>> result = reduceDriver.run();

        Assert.assertEquals("Key mismatch!", 
          data.getFirst(), result.get(0).getFirst());
        Assert.assertEquals("Value mismatch!", 
          data.getSecond(), result.get(0).getSecond());
    }
}

It tests the identity mapper and reducer with custom Writable as key (CustomRecord).
Note, that the key implements WritableComparable, and overrides hashCode and equals.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top