Question

I've discovered this idiom recently, and I am wondering if there is something I am missing. I've never seen it used. Nearly all Java code I've worked with in the wild favors slurping data into a string or buffer, rather than something like this example (using HttpClient and XML APIs for example):

    final LSOutput output; // XML stuff initialized elsewhere
    final LSSerializer serializer;
    final Document doc;
    // ...
    PostMethod post; // HttpClient post request
    final PipedOutputStream source = new PipedOutputStream();
    PipedInputStream sink = new PipedInputStream(source);
    // ...
    executor.execute(new Runnable() {
            public void run() {
                output.setByteStream(source);
                serializer.write(doc, output);
                try {
                    source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }});

    post.setRequestEntity(new InputStreamRequestEntity(sink));
    int status = httpClient.executeMethod(post);

That code uses a Unix-piping style technique to prevent multiple copies of the XML data being kept in memory. It uses the HTTP Post output stream and the DOM Load/Save API to serialize an XML Document as the content of the HTTP request. As far as I can tell it minimizes the use of memory with very little extra code (just the few lines for Runnable, PipedInputStream, and PipedOutputStream).

So, what's wrong with this idiom? If there's nothing wrong with this idiom, why haven't I seen it?

EDIT: to clarify, PipedInputStream and PipedOutputStream replace the boilerplate buffer-by-buffer copy that shows up everywhere, and they also allow you to process incoming data concurrently with writing out the processed data. They don't use OS pipes.

Was it helpful?

Solution

From the Javadocs:

Typically, data is read from a PipedInputStream object by one thread and data is written to the corresponding PipedOutputStream by some other thread. Attempting to use both objects from a single thread is not recommended, as it may deadlock the thread.

This may partially explain why it is not more commonly used.

I'd assume another reason is that many developers do not understand its purpose / benefit.

OTHER TIPS

In your example you're creating two threads to do the work that could be done by one. And introducing I/O delays into the mix.

Do you have a better example? Or did I just answer your question.


To pull some of the comments (at least my view of them) into the main response:

  • Concurrency introduces complexity into an application. Instead of dealing with a single linear flow of data, you now have to be concerned about sequencing of independent data flows. In some cases, the added complexity may be justified, particularly if you can leverage multiple cores/CPUs to do CPU-intensive work.
  • If you are in a situation where you can benefit from concurrent operations, there's usually a better way to coordinate the flow of data between threads. For example, passing objects between threads using a concurrent queue, rather than wrapping the piped streams in object streams.
  • Where a piped stream may be a good solution is when you have multiple threads performing text processing, a la a Unix pipeline (eg: grep | sort).

In the specific example, the piped stream allows use of an existing RequestEntity implementation class provided by HttpClient. I believe that a better solution is to create a new implementation class, as below, because the example is ultimately a sequential operation that cannot benefit from the complexity and overhead of a concurrent implementation. While I show the RequestEntity as an anonymous class, reusability would indicate that it should be a first-class class.

post.setRequestEntity(new RequestEntity()
{
    public long getContentLength()
    {
        return 0-1;
    }

    public String getContentType()
    {
        return "text/xml";
    }

    public boolean isRepeatable()
    {
        return false;
    }

    public void writeRequest(OutputStream out) throws IOException
    {
        output.setByteStream(out);
        serializer.write(doc, output);
    }
});

I too only discovered the PipedInputStream/PipedOutputStream classes recently.

I am developing an Eclipse plug-in that needs to execute commands on a remote server via SSH. I am using JSch and the Channel API reads from an input stream and writes to an output stream. But I need to feed commands through the input stream and read the responses from an output stream. Thats where PipedInput/OutputStream comes in.

import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.jcraft.jsch.Channel;

Channel channel;
PipedInputStream channelInputStream = new PipedInputStream();
PipedOutputStream channelOutputStream = new PipedOutputStream();

channel.setInputStream(new PipedInputStream(this.channelOutputStream));
channel.setOutputStream(new PipedOutputStream(this.channelInputStream));
channel.connect();

// Write to channelInputStream
// Read from channelInputStream

channel.disconnect();

Also, back to the original example: no, it does not exactly minimize memory usage either. DOM tree(s) get built, in-memory buffering done -- while that is better than full byte array replicas, it's not that much better. But buffering in this case will be slower; and an extra thread is also created -- you can not use PipedInput/OutputStream pair from within a single thread.

Sometimes PipedXxxStreams are useful, but the reason they are not used more is because quite often they are not the right solution. They are ok for inter-thread communication, and that's where I have used them for what that's worth. It's just that there aren't that many use cases for this, given how SOA pushes most such boundaries to be between services, instead of between threads.

I tried using these classes a while back for something, I forget the details. But I did discover that their implementation is fatally flawed. I can't remember what it was but I have a sneaky memory that it may have been a race condition which meant that they occasionally deadlocked (And yes, of course I was using them in separately threads: they simply aren't usable in a single thread and weren't designed to be).

I might have a look at their source code andsee if I can see what the problem might have been.

Here's a use case where pipes make sense:

Suppose you have a third party lib, such as an xslt mapper or crypto lib that has an interface like this: doSomething(inputStream, outputStream). And you do not want to buffer the result before sending over the wire. Apache and other clients disallow direct access to the wire outputstream. Closest you can get is obtaining the outputstream - at an offset, after headers are written - in a request entity object. But since this is under the hood, it's still not enough to pass an inputstream and outputstream to the third party lib. Pipes are a good solution to this problem.

Incidentally, I wrote an inversion of Apache's HTTP Client API [PipedApacheClientOutputStream] which provides an OutputStream interface for HTTP POST using Apache Commons HTTP Client 4.3.4. This is an example where Piped Streams might make sense.

java.io pipes have too much context switching (per byte read/write) and their java.nio counterpart requires you to have some NIO background and proper usage of channels and stuff, this is my own implementation of pipes using a blocking queue which for a single producer/consumer will perform fast and scale well:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

So, what's wrong with this idiom? If there's nothing wrong with this idiom, why haven't I seen it?

EDIT: to clarify, PipedInputStream and PipedOutputStream replace the boilerplate buffer-by-buffer copy that shows up everywhere, and they also allow you to process incoming data concurrently with writing out the processed data. They don't use OS pipes.

You have stated what it does but haven't stated why you are doing this.

If you believe that this will either reduce resources used (cpu/memory) or improve performance then it won't do either. However it will make your code more complex.

Basically you have a solution without a problem for which it solves.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top