Почему мой файл последовательности прочитан дважды в моем классе Hadoop Mapper?

StackOverflow https://stackoverflow.com/questions/9514710

  •  14-11-2019
  •  | 
  •  

Вопрос

У меня есть последовательность с 1264 записями. Каждый ключ уникален для каждой записи. Моя проблема в том, что мой Mapper, кажется, читает этот файл дважды, или он читается дважды. Для проверки здравомыслие я написал небольшое утилиту для чтения последовательности и действительно, есть только 1264 записей (I.e. Sequencefile.Reader).

В моем редукторе мне следует получить только 1 запись за счет, однако, когда я повторяю повторную форму (итератор), я получаю 2 записи на ключ (всегда 2 на ключ, а не 1 или 3 или что-то еще на ключ ).

Вывод журнала моей работы ниже. Я не уверен, почему, но почему это то, что «общие входные пути к обрабатыванию» 2? Когда я бежу свою работу, я попробовал --dmapred.input.dir= / data, а также -dmapred.input.dir= / data / part-r-00000, но все же, общие пути к обработке 2.

Любые идеи ценятся.

12/03/01 05:28:30 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
12/03/01 05:28:30 INFO input.FileInputFormat: Total input paths to process : 2
12/03/01 05:28:31 INFO mapred.JobClient: Running job: job_local_0001
12/03/01 05:28:31 INFO input.FileInputFormat: Total input paths to process : 2
12/03/01 05:28:31 INFO mapred.MapTask: io.sort.mb = 100
12/03/01 05:28:31 INFO mapred.MapTask: data buffer = 79691776/99614720
12/03/01 05:28:31 INFO mapred.MapTask: record buffer = 262144/327680
12/03/01 05:28:31 INFO mapred.MapTask: Starting flush of map output
12/03/01 05:28:31 INFO mapred.MapTask: Finished spill 0
12/03/01 05:28:31 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:31 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
12/03/01 05:28:31 INFO mapred.MapTask: io.sort.mb = 100
12/03/01 05:28:31 INFO mapred.MapTask: data buffer = 79691776/99614720
12/03/01 05:28:31 INFO mapred.MapTask: record buffer = 262144/327680
12/03/01 05:28:31 INFO mapred.MapTask: Starting flush of map output
12/03/01 05:28:31 INFO mapred.MapTask: Finished spill 0
12/03/01 05:28:31 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:31 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:31 INFO mapred.Merger: Merging 2 sorted segments
12/03/01 05:28:31 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 307310 bytes
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:32 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/03/01 05:28:32 INFO mapred.LocalJobRunner:
12/03/01 05:28:32 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/03/01 05:28:32 INFO mapred.JobClient:  map 100% reduce 0%
12/03/01 05:28:32 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to results
12/03/01 05:28:32 INFO mapred.LocalJobRunner: reduce > reduce
12/03/01 05:28:32 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
12/03/01 05:28:33 INFO mapred.JobClient:  map 100% reduce 100%
12/03/01 05:28:33 INFO mapred.JobClient: Job complete: job_local_0001
12/03/01 05:28:33 INFO mapred.JobClient: Counters: 12
12/03/01 05:28:33 INFO mapred.JobClient:   FileSystemCounters
12/03/01 05:28:33 INFO mapred.JobClient:     FILE_BYTES_READ=1320214
12/03/01 05:28:33 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1275041
12/03/01 05:28:33 INFO mapred.JobClient:   Map-Reduce Framework
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce input groups=1264
12/03/01 05:28:33 INFO mapred.JobClient:     Combine output records=0
12/03/01 05:28:33 INFO mapred.JobClient:     Map input records=2528
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce output records=2528
12/03/01 05:28:33 INFO mapred.JobClient:     Spilled Records=5056
12/03/01 05:28:33 INFO mapred.JobClient:     Map output bytes=301472
12/03/01 05:28:33 INFO mapred.JobClient:     Combine input records=0
12/03/01 05:28:33 INFO mapred.JobClient:     Map output records=2528
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce input records=2528
.

Мой класс Mapper очень прост. Это читает в текстовом файле. К каждой строке он добавляет «м» к линии.

public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

 private static final Log _log = LogFactory.getLog(MyMapper.class);

 @Override
 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  String s = (new StringBuilder()).append(value.toString()).append("m").toString();
  context.write(key, new Text(s));
  _log.debug(key.toString() + " => " + s);
 }
}
.

Мой класс редуктора тоже очень прост. Это просто добавляет «R» в строку.

public class MyReducer extends Reducer<LongWritable, Text, LongWritable, Text> {

private static final Log _log = LogFactory.getLog(MyReducer.class);

@Override
public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 for(Iterator<Text> it = values.iterator(); it.hasNext();) {
  Text txt = it.next();
  String s = (new StringBuilder()).append(txt.toString()).append("r").toString();
  context.write(key, new Text(s));
  _log.debug(key.toString() + " => " + s);
  }
 }
}
.

Мой класс работы выглядит следующим образом.

public class MyJob extends Configured implements Tool {

public static void main(String[] args) throws Exception {
 ToolRunner.run(new Configuration(), new MyJob(), args);
}

@Override
public int run(String[] args) throws Exception {
 Configuration conf = getConf();
 Path input = new Path(conf.get("mapred.input.dir"));
 Path output = new Path(conf.get("mapred.output.dir"));

 System.out.println("input = " + input);
 System.out.println("output = " + output);

 Job job = new Job(conf, "dummy job");
 job.setMapOutputKeyClass(LongWritable.class);
 job.setMapOutputValueClass(Text.class);
 job.setOutputKeyClass(LongWritable.class);
 job.setOutputValueClass(Text.class);

 job.setMapperClass(MyMapper.class);
 job.setReducerClass(MyReducer.class);

 FileInputFormat.addInputPath(job, input);
 FileOutputFormat.setOutputPath(job, output);

 job.setJarByClass(MyJob.class);

 return job.waitForCompletion(true) ? 0 : 1;
 }
}
.

Мои входные данные выглядят следующие.

T, T
T, T
T, T
F, F
F, F
F, F
F, F
T, F
F, T
.

После запуска моей работы я получаю вывод, как следующее.

0   T, Tmr
0   T, Tmr
6   T, Tmr
6   T, Tmr
12  T, Tmr
12  T, Tmr
18  F, Fmr
18  F, Fmr
24  F, Fmr
24  F, Fmr
30  F, Fmr
30  F, Fmr
36  F, Fmr
36  F, Fmr
42  T, Fmr
42  T, Fmr
48  F, Tmr
48  F, Tmr
.

Я сделал что-то не так с настройкой моей работы? Я попробовал следующий путь, чтобы запустить свою работу, и в этом подходе файл прочитан только один раз. почему это? System.out.println (itsath) и system.out.println (outsath) Значения идентичны! помочь?

public class MyJob2 {

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: MyJob2 <in> <out>");
      System.exit(2);
    }

    String sInput = args[0];
    String sOutput = args[1];

    Path input = new Path(sInput);
    Path output = new Path(sOutput);

    System.out.println("input = " + input);
    System.out.println("output = " + output);

    Job job = new Job(conf, "dummy job");
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);

    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, output);

    job.setJarByClass(MyJob2.class);

    int result = job.waitForCompletion(true) ? 0 : 1;
    System.exit(result);
 }
}
.

Это было полезно?

Решение

i got help from the hadoop mailing list. my problem was with the line below.

FileInputFormat.addInputPath(job, input);

this line simply appends input back to config. after commenting this line out, the input file is read only once now. in fact, i also commented out the other line,

FileOutputFormat.setOutputPath(job, output);

and everything still works.

Другие советы

I've had a similar problem, but for a different reason: linux apparently created a hidden copy of my input file (~input.txt), so that's a second way of getting this error..

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top