Question

I'm running hadoop job on many input files. But if one of the files is corrupted the whole job is fails.

How can I make the job to ignore the corrupted file? maybe write for me some counter/error log but not fail the whole job

Was it helpful?

Solution

It depends on where your job is failing - if a line is corrupt, and somewhere in your map method an Exception is thrown then you should just be able to wrap the body of your map method with a try / catch and just log the error:

protected void map(LongWritable key, Text value, Context context) {
  try {
    // parse value to a long
    int val = Integer.parseInt(value.toString());

    // do something with key and val..
  } catch (NumberFormatException nfe) {
    // log error and continue
  }
}

But if the error is thrown by your InputFormat's RecordReader then you'll need to amend the mappers run(..) method - who's default implementation is as follows:

public void run(Context context) {
  setup(context);
  while (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }
  cleanup(context);
}

So you could amend this to try and catch the exception on the context.nextKeyValue() call but you have to be careful on just ignoring any errors thrown by the reader - an IOExeption for example may not be 'skippable' by just ignoring the error.

If you have written your own InputFormat / RecordReader, and you have a specific exception which denotes record failure but will allow you to skip over and continue parsing, then something like this will probably work:

public void run(Context context) {
  setup(context);
  while (true) {
    try {
      if (!context.nextKeyValue()) { 
        break;
      } else {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } catch (SkippableRecordException sre) {
      // log error
    }

  }
  cleanup(context);
}

But just to re-itterate - your RecordReader must be able to recover on error otherwise the above code could send you into an infinite loop.

For your specific case - if you just want to ignore a file upon the first failure then you can update the run method to something much simpler:

public void run(Context context) {
  setup(context);
  try {
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  } catch (Exception e) {
    // log error
  }
}

Some final words of warning:

  • You need to make sure that it isn't your mapper code which is causing the exception to be thrown, otherwise you'll be ignoring files for the wrong reason
  • GZip compressed files which are not GZip compressed will actually fail in the initialization of the record reader - so the above will not catch this type or error (you'll need to write your own record reader implementation). This is true for any file error that is thrown during record reader creation

OTHER TIPS

This is what Failure Traps are used for in cascading:

Whenever an operation fails and throws an exception, if there is an associated trap, the offending Tuple is saved to the resource specified by the trap Tap. This allows the job to continue processing without any data loss.

This will essentially let your job continue and let you check your corrupt files later

If you are somewhat familiar with cascading in your flow definition statement:

    new FlowDef().addTrap( String branchName, Tap trap );

Failure Traps

There is also another possible way. You could use mapred.max.map.failures.percent configuration option. Of course this way of solving this problem could also hide some other problems occurring during map phase.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top