IO Exception - read end dead - what causes it in this example and how to fix it - multithread application in Java

StackOverflow https://stackoverflow.com/questions/22181107

Question

This is an extension of my question posted here, while that seems to have fixed one part of my problem, now I see IO-Exception read end dead exception. I am using a multithreaded application where thread-1 produces random numbers and other thread-2 consumes it to calculate the average. once the average reaches a threshold, I signal thread-1 to stop producing the numbers. This is the basic design of the code.

I am getting IO-Exception read end dead exception. I want to know why it comes and how to fix it. Thanks.

The code below :

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

//

class NumGen extends Thread {

    PipedOutputStream pos;
    DataOutputStream dos;
    AtomicBoolean isDone;

    public NumGen(PipedOutputStream pos,AtomicBoolean isDone){
        this.pos=pos;
        dos=new DataOutputStream(pos);
        this.isDone=isDone;
    }

    public void run(){
        while (!isDone.get()){
            Random rand = new Random();
            try {
                dos.writeDouble(rand.nextDouble()+100.0);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

class RunningAvg extends Thread {

    PipedInputStream pis;
    DataInputStream dis;
    Double avg;
    int count;
    Double runningTotal;
    AtomicBoolean isDone;

    public RunningAvg(PipedInputStream pis,AtomicBoolean isDone){
        this.pis=pis;
        dis=new DataInputStream(pis);
        runningTotal=0.0;
        avg=0.0;
        this.isDone=isDone;
    }

    public void run(){
        try {
        while (dis.available()>0){
            count+=1;
            runningTotal+=dis.readDouble();
                avg=runningTotal/count;
                System.out.printf("The average in count no : %s is %s%n",count,avg);
                if (avg>1E5)
                  isDone.set(true);
        }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }


public class InterThreadComm {

    public static void main(String[] args){


    try {
        PipedOutputStream pos= new PipedOutputStream();
        PipedInputStream pis = new PipedInputStream(pos);
        AtomicBoolean isDone = new AtomicBoolean(false);
        NumGen ng = new NumGen(pos,isDone);
        RunningAvg ra = new RunningAvg(pis,isDone);
        ng.start();
        ra.start();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }


}

EDIT: As per the answer below: I tried to close the Streams using try-with-resources. but I get java.io.IOException: Pipe closed

public class InterThreadComm {

    public static void main(String[] args){


    try(PipedOutputStream pos= new PipedOutputStream();PipedInputStream pis =new PipedInputStream(pos)){ 
        AtomicBoolean isDone = new AtomicBoolean(false);
        NumGen ng = new NumGen(pos,isDone);
        RunningAvg ra = new RunningAvg(pis,isDone);
        ng.start();
        ra.start();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }


}
Was it helpful?

Solution

The root cause is here:

while (dis.available()>0){

What happens is the consumer thread is able to consume all of the data sometimes. There's nothing available so it breaks early.

The exception gets thrown because PipedInputStream/PipedOutputStream keep track of what threads are reading and writing. There is a private method called checkStateForReceive that throws, basically it is complaining that your consumer thread has ended:

} else if (readSide != null && !readSide.isAlive()) {
    throw new IOException("Read end dead");
}

(readSide is the consumer Thread.)

You can see this get called in the stack trace:

java.io.IOException: Read end dead
    at java.io.PipedInputStream.checkStateForReceive(PipedInputStream.java:246)
    at java.io.PipedInputStream.receive(PipedInputStream.java:210)
    at java.io.PipedOutputStream.write(PipedOutputStream.java:132)
    at java.io.DataOutputStream.writeLong(DataOutputStream.java:207)
    at java.io.DataOutputStream.writeDouble(DataOutputStream.java:242)

So write calls receieve on the input stream, which calls checkStateForReceieve and throws if the reading Thread isn't alive.

I think your loop condition should just be:

while(!isDone.get()) {

Unrelated to the exception, I also think you might have essentially an infinite loop here:

if (avg>1E5)
    isDone.set(true);

Your number generator generates numbers between 100 and 101 so averaging them will never be greater than 1e5. You might have meant to check if runningTotal > 1E5.

Also:

  • Don't forget to close your streams when you are done with them.
  • Your consumer thread catches IOException outside its loop but the producer catches inside the loop. The producer doesn't abort if there's an exception: it just keeps going, catching exceptions over and over. You might want to move the catch to outside the loop like the consumer.

About your edit:

If you want to close the streams in main, you can join on the threads.

try(
    PipedOutputStream pos= new PipedOutputStream();
    PipedInputStream pis =new PipedInputStream(pos)
){ 
    AtomicBoolean isDone = new AtomicBoolean(false);
    NumGen ng = new NumGen(pos,isDone);
    RunningAvg ra = new RunningAvg(pis,isDone);

    ng.start();
    ra.start();

    try {
       ng.join(); // wait for ng and ra to complete
       ra.join(); //
    } catch(InterruptedException ie) {}
} catch (IOException e) {
    e.printStackTrace();
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top