なぜ他の Java コードでは PipedInputStream / PipedOutputStream を使用しないのでしょうか?

StackOverflow https://stackoverflow.com/questions/484119

質問

最近このイディオムを発見したのですが、何か足りないものがあるのではないかと思っています。使われているのを見たことがありません。私が実際に扱ったほぼすべての Java コードは、この例のようなもの (たとえば、HttpClient および XML API を使用する) ではなく、データを文字列またはバッファに丸呑みすることを好みます。

    final LSOutput output; // XML stuff initialized elsewhere
    final LSSerializer serializer;
    final Document doc;
    // ...
    PostMethod post; // HttpClient post request
    final PipedOutputStream source = new PipedOutputStream();
    PipedInputStream sink = new PipedInputStream(source);
    // ...
    executor.execute(new Runnable() {
            public void run() {
                output.setByteStream(source);
                serializer.write(doc, output);
                try {
                    source.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }});

    post.setRequestEntity(new InputStreamRequestEntity(sink));
    int status = httpClient.executeMethod(post);

このコードでは、Unix パイプ スタイルの手法を使用して、XML データの複数のコピーがメモリ内に保持されるのを防ぎます。HTTP Post 出力ストリームと DOM Load/Save API を使用して、XML ドキュメントを HTTP リクエストのコンテンツとしてシリアル化します。私の知る限り、追加のコードはほとんどなく (ほんの数行のコードで) メモリの使用が最小限に抑えられます。 Runnable, PipedInputStream, 、 そして PipedOutputStream).

それで、このイディオムの何が問題なのでしょうか?このイディオムに問題がないのなら、なぜ私はそれを見なかったのでしょうか?

編集:明確にするために、 PipedInputStream そして PipedOutputStream どこにでも現れる定型的なバッファごとのコピーを置き換えます。また、処理されたデータを書き出すと同時に受信データを処理することもできます。OS パイプは使用しません。

役に立ちましたか?

解決

のJavadocする:

  

一般的に、データは、1つのスレッドが持つPipedInputStreamオブジェクトから読み出されたデータは、他のスレッドによって対応持つPipedOutputStreamに書き込まれます。それはスレッドをデッドロックする可能性として、単一のスレッドから両方のオブジェクトを使用しようとすると、推奨されません。

これは、部分的にそれがより一般的に使用されていない理由を説明することがあります。

私はもう一つの理由は、多くの開発者がその目的/ベネフィットを理解していないということであると仮定と思います。

他のヒント

あなたの例では、1 つで実行できる作業を実行するために 2 つのスレッドを作成しています。さらに、I/O 遅延も導入します。

もっと良い例はありますか?それとも、私はあなたの質問に答えただけでしょうか。


コメントの一部 (少なくとも私の見解) をメインの応答に取り込むには:

  • 同時実行によりアプリケーションは複雑になります。単一の線形データ フローを扱う代わりに、独立したデータ フローの順序付けを考慮する必要があります。場合によっては、特に複数のコア/CPU を活用して CPU 集中型の作業を実行できる場合には、複雑さが増すことは正当化される場合があります。
  • 同時操作のメリットが得られる状況では、通常、スレッド間のデータ フローを調整するためのより良い方法があります。たとえば、パイプされたストリームをオブジェクト ストリームでラップするのではなく、同時キューを使用してスレッド間でオブジェクトを渡します。
  • パイプされたストリームが良い解決策となる可能性があるのは、Unix パイプラインのように複数のスレッドがテキスト処理を実行している場合です (例:グレップ|選別)。

特定の例では、パイプされたストリームにより、HttpClient によって提供される既存の RequestEntity 実装クラスの使用が可能になります。この例は最終的には順次操作であり、同時実装の複雑さとオーバーヘッドの恩恵を受けることができないため、より良い解決策は、以下のように新しい実装クラスを作成することだと考えています。RequestEntity を匿名クラスとして示していますが、再利用可能であることから、これはファーストクラスのクラスである必要があります。

post.setRequestEntity(new RequestEntity()
{
    public long getContentLength()
    {
        return 0-1;
    }

    public String getContentType()
    {
        return "text/xml";
    }

    public boolean isRepeatable()
    {
        return false;
    }

    public void writeRequest(OutputStream out) throws IOException
    {
        output.setByteStream(out);
        serializer.write(doc, output);
    }
});

私も最近だけ持つPipedInputStream / PipedOutputStreamのクラスを発見しました。

私はSSH経由でリモートサーバ上でコマンドを実行する必要があるのEclipseプラグインを開発しています。私は JSCH を使用していますし、チャンネルAPIは、入力ストリームから読み込み、出力ストリームに書き込みます。しかし、私は、入力ストリームを介してコマンドを供給し、出力ストリームからの応答を読み取る必要があります。 PipedInput / OutputStreamのがでてくる厥ます。

import java.io.PipedInputStream;
import java.io.PipedOutputStream;

import com.jcraft.jsch.Channel;

Channel channel;
PipedInputStream channelInputStream = new PipedInputStream();
PipedOutputStream channelOutputStream = new PipedOutputStream();

channel.setInputStream(new PipedInputStream(this.channelOutputStream));
channel.setOutputStream(new PipedOutputStream(this.channelInputStream));
channel.connect();

// Write to channelInputStream
// Read from channelInputStream

channel.disconnect();

また、元の例に:いいえ、それは正確にいずれかのメモリ使用量を最小限に抑えていません。それは完全なバイト配列のレプリカよりも優れている一方で、それははるかに良いではありません - DOMツリー(S)は、メモリ内のバッファリングが行われ、構築されます。 しかし、この場合、バッファリングが遅くなります。余分なスレッドも作成されます - あなたは、単一のスレッド内からPipedInput / OutputStreamのペアを使用することはできません。

は時々PipedXxxStreamsは有用であるが、かなり頻繁に、彼らは適切なソリューションではありませんので、彼らはより多くを使用していない理由があります。彼らは、スレッド間通信のためにOKです、と私はそれが価値がある何のためにそれらを使用しているところです。それは代わりに、スレッド間で、SOAはサービスの間であることがほとんどな境界をプッシュする方法を与え、このため、多くのユースケースがないだけということです。

私は詳細を忘れて、しばらく前に何かのためにこれらのクラスを使用してみました。しかし、私は彼らの実装は致命的な欠陥があることを発見しました。私はそれが何だったか覚えていないことが、私はそれは彼らが時折デッドロック(そして、はい、もちろん私は別にスレッドでそれらを使用していたことを意味競合状態であったかもしれないという卑劣なメモリを持っている:彼らは単にでは使用できませんシングルスレッドと)なるように設計されていなかった。

私は問題があったかもしれないものを見ることができれば、私はandseeそのソースコードを見ている可能性があります。

ここでパイプが意味をなすのユースケースがあります:

doSomethingの(のinputStream、OutputStreamの):

を使用すると、XSLTマッパーまたはこのようなインターフェースを持っている暗号のlibなどのサードパーティのlibが、持っていると仮定します。そして、あなたは、ワイヤ上で送信する前に結果をバッファリングする必要はありません。 Apacheや他のクライアントは、ワイヤOutputStreamに直接アクセスを禁止します。あなたが得ることができる最も近いOutputStreamを取得している - オフセットで、ヘッダが書き込まれた後 - リクエストのエンティティオブジェクトに。これはボンネットの下にあるので、しかし、それは第三者のLIBに入力ストリームと出力ストリームを渡すためにはまだ十分ではありません。パイプは、この問題に対する良い解決策です。

ちなみに、私は、ApacheのHTTPクライアントAPIの反転を書いた [PipedApacheClientOutputStream] のこれはApache CommonsのHTTPクライアント4.3.4を使用してHTTP POSTのためのOutputStreamインタフェースを提供します。これは、パイプによるストリームは意味をなさないかもしれない例です。

java.ioパイプ(読み取り/書き込みバイトごとに)切り替えが多すぎるコンテキストを持っており、彼らのjava.nioのカウンターパートは、いくつかのNIO背景とチャンネルとスタッフの適切な使用法を持っているあなたを必要とし、これは、ブロッキングを使用してパイプの私自身の実装です

:単一プロデューサ/コンシューマは、高速実行し、うまくスケールしますのキュー
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }

それで、このイディオムの何が問題なのでしょうか?このイディオムに何も問題がない場合、なぜ私はそれを見なかったのですか?

編集:明確にするために、PipedInputStreamとPipedOutputStreamは、どこにでも表示されるボイラープレートバッファーごとのコピーを交換し、処理されたデータの書き出しと同時に入力データを処理することもできます。OSパイプは使用しません。

あなたはそれが何をするかについて述べましたが、なぜこれを行うのかについては述べていません。

これにより、使用されるリソース (CPU/メモリ) が削減されるか、パフォーマンスが向上すると信じている場合は、どちらも効果がありません。ただし、コードがより複雑になります。

基本的に、解決すべき問題がなくても解決策はあります。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top