subprocess.Popenを使用して複数のプロセスをパイプで接続するにはどうすればよいですか?

StackOverflow https://stackoverflow.com/questions/295459

  •  08-07-2019
  •  | 
  •  
役に立ちましたか?

解決

次のことに少し満足するでしょう。

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
    stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )

作業の一部をシェルに委任します。パイプラインで2つのプロセスを接続します。

'script.awk'をPythonに書き直して、awkとパイプラインを削除するのは非常に楽しいでしょう。

編集。 awkが役に立たないことを示唆する理由のいくつか。

[コメントで返信するには理由が多すぎます。]

  1. Awkは重要な値のないステップを追加しています。 Pythonが処理しないawkの処理については、何もユニークなものはありません。

  2. awkからソートへのパイプライン処理により、大量のデータが処理されるため、経過処理時間が改善される場合があります。短いデータセットの場合、大きな利点はありません。 awk> fileの簡単な測定。ファイルのソート awk | sort は、同時実行の助けを明らかにします。並べ替えでは、並べ替えは1回限りのフィルターではないため、ほとんど役立ちません。

  3. 「ソートするPython」のシンプルさ処理(" Python to awk to sort"の代わりに)は、ここで尋ねられる質問の正確な種類を防ぎます。

  4. Python-awkよりも冗長ですが、awkには初心者には不透明で、専門家でない人には混乱を招くような暗黙のルールがあります。

  5. Awk(シェルスクリプト自体と同様)は、Yet Another Programming言語を追加します。これらすべてを1つの言語(Python)で実行できる場合、シェルとawkプログラミングを排除すると2つのプログラミング言語が排除され、誰かがタスクの価値を生み出す部分に集中できるようになります。

下の行:awkは大きな価値を追加できません。この場合、awkは正味コストです。この質問をする必要があるほど複雑になりました。 awkを削除すると、純益になります。

サイドバーパイプライン( a | b )の構築が非常に難しい理由。

シェルが a | b 以下を実行する必要があります。

  1. 元のシェルの子プロセスをフォークします。これは最終的にbになります。

  2. OSパイプを構築します。 (Python subprocess.PIPEではありません)が、共通バッファーを介して接続された2つの新しいファイル記述子を返す os.pipe()を呼び出します。この時点で、プロセスには、その親からのstdin、stdout、stderrに加えて、「a's stdout」になるファイルがあります。および「b's stdin」。

  3. 子をフォークします。子は、そのstdoutを新しいaのstdoutに置き換えます。 a プロセスを実行します。

  4. bの子が閉じると、そのstdinが新しいbのstdinに置き換えられます。 b プロセスを実行します。

  5. bの子は、aが完了するのを待ちます。

  6. 親はbが完了するのを待っています。

上記を再帰的に使用して a |を生成できると思います。 b | c 、ただし、長いパイプラインを暗黙的にかっこで囲む必要があります。 (b | c)。

Pythonには os.pipe() os.exec()、および os.fork()があるため、< code> sys.stdin および sys.stdout 、純粋なPythonで上記を行う方法があります。実際、 os.pipe()および subprocess.Popen を使用して、いくつかのショートカットを作成できる場合があります。

ただし、その操作をシェルに委任する方が簡単です。

他のヒント

import subprocess

some_string = b'input_data'

sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in, 
                 stdin=subprocess.PIPE).communicate(some_string)

シェルパイプラインをエミュレートするには:

from subprocess import check_call

check_call('echo "input data" | a | b > outfile.txt', shell=True)

シェルを呼び出さずに( 17.1.4.2。シェルの交換を参照してください。パイプライン):

#!/usr/bin/env python
from subprocess import Popen, PIPE

a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
    with a.stdout, open("outfile.txt", "wb") as outfile:
        b = Popen(["b"], stdin=a.stdout, stdout=outfile)
    a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already

plumbum は、いくつかの構文シュガーを提供します。

#!/usr/bin/env python
from plumbum.cmd import a, b # magic

(a << "input data" | b > "outfile.txt")()

類似物:

#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt

is:

#!/usr/bin/env python
from plumbum.cmd import awk, sort

(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()

http://www.python.org/doc/2.5.2/lib/node535.html はこれをかなりよくカバーしていました。理解できない部分がありますか?

プログラムはかなり似ていますが、2番目の Popen はファイルに対してstdout =を持ち、その .communicate()の出力は必要ありません。 。

@Cristianの回答に触発されました。同じ問題に出会ったが、コマンドは異なる。そこで、テストした例を紹介しますが、これは役立つと思われます。

grep_proc = subprocess.Popen(["grep", "rabbitmq"],
                             stdin=subprocess.PIPE, 
                             stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()

これはテストされています。

行われたこと

  • パイプからの標準入力を使用して遅延 grep 実行を宣言しました。このコマンドは、パイプが ps のstdoutで満たされると、 ps コマンドの実行時に実行されます。
  • プライマリコマンド ps を呼び出し、stdoutを grep コマンドで使用されるパイプに送信しました。
  • Grepは、パイプからstdoutを取得するために通信しました。

この方法が気に入っているのは、サブプロセスインターフェースで優しくラップされた自然なパイプの概念だからです。

受け入れられた答えは、問題を回避することです。 以下は、複数のプロセスの出力を連鎖するスニペットです。 また、(多少)同等のシェルコマンドが出力されるため、実行して出力が正しいことを確認できます。

#!/usr/bin/env python3

from subprocess import Popen, PIPE

# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)

print("Output from last process : " + (p3.communicate()[0]).decode())

# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)

編集: pipes はWindowsで使用できますが、決定的に、Windowsでは実際には動作していないようです。以下のコメントを参照してください。

Python標準ライブラリには、これを処理するための pipes モジュールが含まれるようになりました。

https://docs.python.org/2/library/pipes.html https://docs.python.org/3.4/library/pipes.html

このモジュールがどのくらいの期間使用されたかはわかりませんが、このアプローチは subprocess をいじくり回すよりも非常に簡単なようです。

これまでの回答では重要な点が欠落していました。指摘されているように、シェルパイプラインの置換は基本的に正しいジオカーで。パイプの最後の要素で communicate を実行するのに十分ですほぼ

残りの問題は、入力データをパイプラインに渡すことです。複数のサブプロセスでは、最後の要素に対する単純な communicate(input_data)は機能しません-永久にハングします。次のように、パイプラインと子を手動で作成する必要があります。

import os
import subprocess

input = """\
input data
more input
""" * 10

rd, wr = os.pipe()
if os.fork() != 0: # parent
    os.close(wr)
else:              # child
    os.close(rd)
    os.write(wr, input)
    os.close(wr)
    exit()

p_awk = subprocess.Popen(["awk", "{ print $2; }"],
                         stdin=rd,
                         stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"], 
                          stdin=p_awk.stdout,
                          stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())

ここで、子はパイプを介して入力を提供し、親は予想どおりに動作するCommunicate()を呼び出します。このアプローチを使用すると、「作業の一部をシェルに委任」することなく、任意の長いパイプラインを作成できます。残念ながら、サブプロセスのドキュメントではこれについて言及されていません。

パイプなしで同じ効果を達成する方法があります:

from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)

今、 p_awk には stdin = tf を使用します。好みの問題です。

信号処理が異なるため、上記はまだbashパイプラインと100%同等ではありません。これは、 sort の出力を切り捨てる別のパイプ要素を追加した場合に確認できます。 head -n 10 。上記のコードでは、 sort は&quot; Broken pipe&quot;を出力します。 stderr へのエラーメッセージ。シェルで同じパイプラインを実行すると、このメッセージは表示されません。 (それが唯一の違いですが、 stdout の結果は同じです)。理由は、Pythonの Popen SIG_IGN SIGPIPE に設定しているのに対し、シェルは SIG_DFL に残しているためと思われます。 sort のシグナル処理は、これら2つのケースで異なります。

私にとって、以下のアプローチは最もクリーンで読みやすいです

from subprocess import Popen, PIPE

def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
    with open(output_filename, 'wb') as out_f:
        p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
        p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
        p1.communicate(input=bytes(input_s))
        p1.wait()
        p2.stdin.close()
        p2.wait()

次のように呼び出すことができます:

string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top