I was facing the same problem, so found following solution for that.
In Spout, when you are reading file, create FileReader object in open() method,because that time it initiatizes the reader object for worker node.
And use that object in nextTuple() method.
(with one spout and one bolt)
Following is the code for open() and nextTuple method:
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
File file = new File(filename);
this.fileReader = new FileReader(file);
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+ filename + "]");
}
this.collector = collector;
}
public void nextTuple() {
/**
* The nextuple it is called forever, so if we have been readed the file
* we will wait and then return
*/
if (completed) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Do nothing
}
return;
}
String str;
BufferedReader reader = new BufferedReader(fileReader);
try {
// Read all lines
while ((str = reader.readLine()) != null) {
/**
* By each line emit a new value with the line as a their
*/
this.collector.emit(new Values(str), str);
}
} catch (Exception e) {
throw new RuntimeException("Error reading tuple", e);
} finally {
completed = true;
}
}
Output :
Kaveen,bigdata,29
varadha,cshart,30
vignesh,winrt,21
Another issue can be :
You may be running more than one instance for spout, that can cause repetitive emission of streams, or file is written in append mode.