Pregunta

Please go a little easy on me cause I am only 3 months old in Hadoop and Mapreduce.

I've 2 files 120 MB each, The data inside each file is completely unstructured but with a common pattern. Because of the varying structure of data my requirement can not be sufficed by the default LineInputFormat.

Hence While reading the file I override the isSplitable() method and stop the split by returning false. so that 1 mapper can access one complete file and I can perform my logic and achieve the requirement.

My machine can run two mappers in parallel, So by stopping the split i am degrading the performance by running the mapper one by one for each file rather then running two mappers parallely for a file.

My Question is How can I run two mappers in parallel for both the files so the performance improves.

For Example

When split was allowed:
    file 1: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min 
    file 2: split 1 (1st mapper) || split 2 (2nd mapper)------ 2 min

    Total Time for reading two files =====  4 min

When Split not allowed:
    file 1: no parallel jobs so (1st mapper)---------4 min
    file 2: no parallel jobs so (1st mapper)---------4 min

    Total Time to read two files ===== 8 min (Performance degraded)

What I want
    File 1 (1st Mapper) || file 2 (2nd Mapper) ------4 min

    Total time to read two files ====== 4 min 

Basically I want both the Files to be read at the same time by two different mapper.

Please help me in achieving the scenario.

Below are my Custom InputFormat and Custom RecordReader Code.

public class NSI_inputformatter extends FileInputFormat<NullWritable, Text>{
@Override
public boolean isSplitable(FileSystem fs, Path filename)
{
    //System.out.println("Inside the isSplitable Method of NSI_inputformatter");
    return false;
}

@Override
public RecordReader<NullWritable, Text> getRecordReader(InputSplit split,
        JobConf job_run, Reporter reporter) throws IOException {
    // TODO Auto-generated method stub
    //System.out.println("Inside the getRecordReader method of NSI_inputformatter");

    return new NSI_record_reader(job_run, (FileSplit)split);
}

}

Record Reader:

public class NSI_record_reader implements RecordReader<NullWritable, Text> 
{
FileSplit split;
JobConf job_run;
String text;
public boolean processed=false;
public NSI_record_reader(JobConf job_run, FileSplit split)
{
    //System.out.println("Inside the NSI_record_reader constructor");
    this.split=split;
    this.job_run=job_run;

    //System.out.println(split.toString());
}
@Override
public boolean next(NullWritable key, Text value) throws IOException {
    // TODO Auto-generated method stub
    //System.out.println("Inside the next method of the NLI_record_reader");
    if (!processed)
    {
        byte [] content_add=new byte[(int)(split.getLength())];
        Path file=split.getPath();
        FileSystem fs=file.getFileSystem(job_run);
        FSDataInputStream input=null;


        try{
            input=fs.open(file);
            System.out.println("the input is " +input+ input.toString());
            IOUtils.readFully(input, content_add, 0, content_add.length);
            value.set(content_add, 0, content_add.length);
        }
        finally
        {
            IOUtils.closeStream(input);

        }
        processed=true;
        return true;
    }

    return false;
}

@Override
public void close() throws IOException {
    // TODO Auto-generated method stub

}

@Override
public NullWritable createKey() {
    System.out.println("Inside createkey() mrthod of NSI_record_reader");
    // TODO Auto-generated method stub
    return  NullWritable.get();
}

@Override
public Text createValue() {
    System.out.println("Inside createValue() mrthod of NSI_record_reader");
    // TODO Auto-generated method stub
    return new Text();
}

@Override
public long getPos() throws IOException {
    // TODO Auto-generated method stub
    System.out.println("Inside getPs() mrthod of NSI_record_reader");
    return processed ? split.getLength() : 0;
}

@Override
public float getProgress() throws IOException {
    // TODO Auto-generated method stub
    System.out.println("Inside getProgress() mrthod of NSI_record_reader");
    return processed ? 1.0f : 0.0f;
}

}

Input Sample:

<Dec 12, 2013 1:05:56 AM CST> <Error> <HTTP> <BEA-101017>       <[weblogic.servlet.internal.WebAppServletContext@42e87d99 - appName: 'Agile', name:    '/Agile', context-path: '/Agile', spec-version: 'null'] Root cause of ServletException.
  javax.servlet.jsp.JspException: Connection reset by peer: socket write error
at com.agile.ui.web.taglib.common.FormTag.writeFormHeader(FormTag.java:498)
at com.agile.ui.web.taglib.common.FormTag.doStartTag(FormTag.java:429)
at jsp_servlet._default.__login_45_cms._jspService(__login_45_cms.java:929)
at weblogic.servlet.jsp.JspBase.service(JspBase.java:34)
at    weblogic.servlet.internal.StubSecurityHelper$ServletServiceAction.run(StubSecurityHelper.ja va:227)
Truncated. see log file for complete stacktrace
>
Retrieving the value for the attribute Page Two.Validation Status for the Object 769630
Retrieving the value for the attribute Page Two.Pilot Required for the Object 769630
Retrieving the value for the attribute Page Two.NPO Contact for the Object 769630
<Dec 12, 2013 1:12:13 AM CST> <Warning> <Socket> <BEA-000449> <Closing socket as no         data read from it during the configured idle timeout of 0 secs> 

Thanks.

¿Fue útil?

Solución

You could try to set property -D mapred.min.split.size=209715200. In this case FileInputFormat should not split your files because they are smaller than mapred.min.split.size.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top