First it's worth to test whether the serialization / deserialization works really as expected.
Without knowing how you wrote the test, the following simple one works well with MRUnit 0.9.0-incubating and JUnit 4.10 :
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
public class TestCustom {
private MapDriver<CustomRecord, Text, CustomRecord, Text> mapDriver;
private Mapper<CustomRecord, Text, CustomRecord, Text> map =
new Mapper<CustomRecord, Text, CustomRecord, Text>();
private Reducer<CustomRecord, Text, CustomRecord, Text> reduce =
new Reducer<CustomRecord, Text, CustomRecord, Text>();
private ReduceDriver<CustomRecord, Text, CustomRecord, Text> reduceDriver
= ReduceDriver.newReduceDriver(reduce);
private MapReduceDriver<CustomRecord, Text, CustomRecord,
Text, CustomRecord, Text> mapReduceDriver;
private Configuration conf = new Configuration();
//test data
private Pair<CustomRecord, Text> data;
//shuffled and sorted data
private static List<Pair<CustomRecord, List<Text>>> shuffledData;
@Before
public void init() {
mapDriver = MapDriver.newMapDriver(map);
mapReduceDriver = MapReduceDriver.newMapReduceDriver(map, reduce);
mapDriver.withConfiguration(conf);
initData();
}
private void initData() {
CustomRecord key = new CustomRecord("first", 1);
Text value = new Text("key1");
data = new Pair<CustomRecord, Text>(key, value);
}
@Test
public void testMapper() throws IOException {
mapDriver.withInput(data);
//expected output result
mapDriver.withOutput(data);
mapDriver.runTest(true);
//shuffle and sort
List<Pair<CustomRecord, Text>> pairs =
new ArrayList<Pair<CustomRecord, Text>>();
pairs.add(data);
shuffledData = mapReduceDriver.shuffle(pairs);
}
@Test
public void testReducer() throws IOException {
// feed input to one single reduce call
Pair<CustomRecord, List<Text>> pair = shuffledData.get(0);
reduceDriver.withInput(pair.getFirst(), pair.getSecond());
//reducer's output
List<Pair<CustomRecord, Text>> result = reduceDriver.run();
Assert.assertEquals("Key mismatch!",
data.getFirst(), result.get(0).getFirst());
Assert.assertEquals("Value mismatch!",
data.getSecond(), result.get(0).getSecond());
}
}
It tests the identity mapper and reducer with custom Writable as key (CustomRecord).
Note, that the key implements WritableComparable, and overrides hashCode and equals.