Domanda

Hi I created a Storm Program which reads a text file input.txt using the spout class line by line and emits those tuples to bolt, In bolt class I want to write the tuples into output.txt. I had almost done but the problem is the storm writes many times in the output file. Look at my input.txtand output.txt file

Input.txt

Kaveen,bigdata,29
varadha,cshart,30
vignesh,winrt,21

Output.

varadha,cshart,30
vignesh,winrt,21
Kaveen,bigdata,29
varadha,cshart,30
Kaveen,bigdata,29
Kaveen,bigdata,29
vignesh,winrt,21

I want to write the output file exactly as similar to inputfile but the order isn't a matter. How do I achieve that please help me.

È stato utile?

Soluzione

I was facing the same problem, so found following solution for that.

In Spout, when you are reading file, create FileReader object in open() method,because that time it initiatizes the reader object for worker node. And use that object in nextTuple() method.
(with one spout and one bolt)

Following is the code for open() and nextTuple method:

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
            try {
            this.context = context;
            File file = new File(filename);
            this.fileReader = new FileReader(file);
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+ filename + "]");
        }
        this.collector = collector;
    }

    public void nextTuple() {
        /**
         * The nextuple it is called forever, so if we have been readed the file
         * we will wait and then return
         */
        if (completed) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Do nothing
            }
            return;
        }
        String str;
        BufferedReader reader = new BufferedReader(fileReader);
        try {
            // Read all lines
            while ((str = reader.readLine()) != null) {
                /**
                 * By each line emit a new value with the line as a their
                 */
                this.collector.emit(new Values(str), str);
            }
        } catch (Exception e) {
            throw new RuntimeException("Error reading tuple", e);
        } finally {
            completed = true;
        }
    }

Output :

Kaveen,bigdata,29
varadha,cshart,30
vignesh,winrt,21

Another issue can be :

You may be running more than one instance for spout, that can cause repetitive emission of streams, or file is written in append mode.

Altri suggerimenti

Before writing the content to the output.txt file, just open the output.txt in append mode. whenever a write statement occurs just append the content to this output.txt there by checking the duplicate records in the file.

It looks like you have more than one spout instance reading the input file causing duplicate records in the output.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top