Question

I am trying to prototype a program that would read a stream then spin up new processing streams in a thread pool as the main thread got further along in the stream. I'm running into a problem with PipedStreams and CountDownLatch.

When I run the following code with the "latch.await()" commented out in the main thread I get a "Write end dead" error. When I run with the "latch.await()" uncommented the program hangs and niether thread exits.

Any idea what I've done wrong or advice on how better to process a single stream in place with multiple processing streams. Thx.

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.CountDownLatch;
//import java.util.Arrays;

public class RunTestPipePool {

    final static int BUFFER_SIZE = 8;

    static class PipeWriter extends Thread implements Runnable {
        InputStream in;
        OutputStream out;
        CountDownLatch latch;

        public PipeWriter(InputStream in, OutputStream out, CountDownLatch latch) {
            this.in = in;
            this.out = out;
            this.latch = latch;
        }

        public void run() {
            try {
                byte[] buffer = new byte[BUFFER_SIZE];
                int n = 0;
                while ((n = in.read(buffer)) >= 0) {
//                  System.out.println("PipeWriter Processing: " + new String(Arrays.copyOfRange(buffer,0,n)));
                    out.write(buffer,0,n);
                }
                latch.countDown();
                out.close();
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                System.out.println("PipeWriter Terminating");
            }
        }
    }

    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(1);

        PipedInputStream writeIn = new PipedInputStream();
        PipedOutputStream readOut = new PipedOutputStream(writeIn);

        InputStream reader = new BufferedInputStream(System.in);
        PipeWriter writer = new PipeWriter(writeIn,System.out,latch);

        writer.start();

        byte[] buffer = new byte[BUFFER_SIZE];
        int n = 0;
        while ((n = reader.read(buffer)) >= 0) {
//          System.out.println("RunTestPipePool Processing: " + new String(Arrays.copyOfRange(buffer,0,n)));
            readOut.write(buffer,0,n);
        }

        latch.await();
        reader.close();

        System.out.println("RunTestPipePool Terminating");
    }
}

Output with latch.await() commented out:

C:\Users\matty\Documents\workspace\test_pipe\bin>echo hello world | java -jar Ru
nTestPipePool.jar
RunTestPipePool Terminating
hello world
java.io.IOException: Write end dead
        at java.io.PipedInputStream.read(PipedInputStream.java:294)
        at java.io.PipedInputStream.read(PipedInputStream.java:361)
        at java.io.InputStream.read(InputStream.java:82)
        at RunTestPipePool$PipeWriter.run(RunTestPipePool.java:28)
PipeWriter Terminating

Output with latch.await() uncommented:

C:\Users\matty\Documents\workspace\test_pipe\bin>echo hello world | java -jar Ru
nTestPipePool.jar
hello world

C:\Users\matty\Documents\workspace\test_pipe\bin>

Modified code:

import java.io.BufferedInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.CountDownLatch;
//import java.util.Arrays;

public class RunTestPipeStack {

    final static int BUFFER_SIZE = 8;

    static class PipeWriter extends Thread implements Runnable {
        InputStream in;
        OutputStream out;
        CountDownLatch latch;

        public PipeWriter(InputStream in, OutputStream out, CountDownLatch latch) {
            this.in = in;
            this.out = out;
            this.latch = latch;
        }

        public void run() {
            try {
                byte[] buffer = new byte[BUFFER_SIZE];
                int n = 0;
                while (in.available() != 0 && (n = in.read(buffer)) >= 0) {
//                  System.out.println("PipeWriter Processing: " + new String(Arrays.copyOfRange(buffer,0,n)));
                    out.write(buffer,0,n);
                }
                System.out.println("PipeWriter Terminating");
                in.close();
                out.close();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(1);

        PipedInputStream writeIn = new PipedInputStream();
        PipedOutputStream readOut = new PipedOutputStream(writeIn);

        InputStream reader = new BufferedInputStream(System.in);
        PipeWriter writer = new PipeWriter(writeIn,System.out,latch);

        writer.start();

        byte[] buffer = new byte[BUFFER_SIZE];
        int n = 0;
        while ((n = reader.read(buffer)) >= 0) {
            //          System.out.println("RunTestPipePool Processing: " + new String(Arrays.copyOfRange(buffer,0,n)));
            readOut.write(buffer,0,n);
        }

        reader.close();

        System.out.println("RunTestPipePool Terminating");
    }
}

Modified output:

C:\Users\matty\Documents\workspace\test_pipe\bin>echo hello world | java -jar Ru
nTestPipeStack.jar
RunTestPipePool Terminating
hello world
PipeWriter Terminating

C:\Users\matty\Documents\workspace\test_pipe\bin>
Was it helpful?

Solution

Usage of await() is correct but the reason its not working is that the thread is in blocking state with in.read() - waiting for next set of bytes. But after "hello world", there isn't anything coming. After completing the read, check if there are more bytes available. If they are not close the streams and break out of the loop. Also you may want to add in!=null check in while loop.

 while (in!=null && (n = in.read(buffer)) >= 0) {
               System.out.println("PipeWriter Processing: " + new  
                                String(Arrays.copyOfRange(buffer,0,n)));
                out.write(buffer,0,n);


                if(in.available()==0)   
                {
                    latch.countDown();
                    out.close();    
                    in.close();
                    break;
                }

            }
            System.out.println("Completed the PipeWriter loop");

Hope this helps!

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top