Domanda

There are sets of binary files that I need to split (according to some logic) and distribute to mappers. I use Hadoop streaming for this. The main problem is to send the exact binary chunks over the wire without altering them. It turned out that sending raw bytes is not trivial.

To better illustrate the problem I wrote a very simple extended RecordReader class that should read some bytes from the split and send them. Binary data can contain anything (including the newline character). Here is what next() might read:

public class MyRecordReader implements
        RecordReader<BytesWritable, BytesWritable> {
    ...
    public boolean next(BytesWritable key, BytesWritable ignore)
            throws IOException {
        ...

        byte[] result = new byte[8];
        for (int i = 0; i < result.length; ++i)
            result[i] = (byte)(i+1);
        result[3] = (byte)'\n';
        result[4] = (byte)'\n';

        key.set(result, 0, result.length);
        return true;
    }
}

In this scenario each call to next() function should write the following bytes sequence to stdin: 01 02 03 0a 0a 06 07 08. If I use typed bytes (Hadoop-1722) then the sequence should be prefixed with five bytes in total, The first byte is for the type of the sequence (0 for bytes), the other four bytes for size. So the sequence should look exactly like this: 00 00 00 00 08 01 02 03 0a 0a 06 07 08.

I tested it against /bin/cat to verify the results, the command is the following:

hadoop jar <streaming jar location>
  -libjars <my input format jar>
  -D stream.map.input=typedbytes
  -mapper /bin/cat
  -inputformat my.input.Format

Using hexdump to see the incoming keys I got this: 00 00 00 00 08 01 02 03 09 0a 09 0a 06 07 08. As you can see every 0a (newline) is prefixed with 09 (tab) nevertheless typed bytes gives the (previously) correct information about the type and the size of the bytes sequence.

This imposes a serious problem in writing mappers using other languages because bytes are altered on the way.

It seems there is no guarantee that bytes will be sent exactly as they are unless there is another I am missing something?

È stato utile?

Soluzione

I figured out the solution to this problem thanks to a very helpful hint in hadoop-user mailing list.

In short we need to override how Hadoop IO writes/reads data to/from standard stream. To do this:

  1. Extend InputWriter, OutputReader, also provide your own InputFormat and OutputFormat such that you completely control how bytes are written to and read from stream.
  2. Extend IdentifierResolver class in order to tell Hadoop to use your own InputWriter and OutputReader.

Use your IdentifierResolver, InputFormat, and OuputFormat as the following:

hadoop jar <streaming jar location>
-D stream.io.identifier.resolver.class=my.own.CustomIdentifierResolver
-libjars <my input format jar>
-mapper /bin/cat
-inputformat my.own.CustomInputFormat
-outputformat my.own.CustomOutputFormat
<other options ...>

The patch provided in feature (not-merged) MAPREDUCE-5018 is a great source on how to do this and can be customized to fit one's needs.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top