如何使用 subprocess.Popen 通过管道连接多个进程?
-
08-07-2019 - |
题
如何使用 Python 执行以下 shell 命令 subprocess
模块?
echo "input data" | awk -f script.awk | sort > outfile.txt
输入数据将来自字符串,所以我实际上不需要 echo
. 。我已经走到这一步了,谁能解释一下我是如何让它通过管道的 sort
也?
p_awk = subprocess.Popen(["awk","-f","script.awk"],
stdin=subprocess.PIPE,
stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )
更新: :请注意,虽然下面接受的答案实际上并没有回答所提出的问题,但我相信 S.Lott 是对的,最好首先避免解决这个问题!
解决方案
你会对以下内容感到高兴。
import subprocess
awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )
将部分工作委托给shell。让它用管道连接两个进程。
你会更乐意将'script.awk'改写成Python,消除awk和管道。
修改即可。提出awk没有帮助的一些原因。
[有太多理由通过评论回复。]
-
Awk正在增加一个没有重大价值的步骤。有关Python无法处理的awk处理没有什么独特之处。
-
对于大型数据集,从awk到sort的流水线操作可以提高处理时间。对于短数据集,它没有显着的好处。快速测量
awk >file ; sort file
和awk | sort
将揭示并发性有帮助。使用sort,它很少有帮助,因为sort不是一次性过滤器。 -
<!>的简单性; Python用于排序<!> quot;处理(而不是<!>“; Python to awk to sort <!> quot;)可以防止在这里提出确切的问题。
-
Python - 虽然比awk更冗长 - 也是明确的,其中awk具有某些隐含的规则,这些规则对新手不透明,并且对非专业人员造成混淆。
-
Awk(就像shell脚本本身一样)添加了另一种编程语言。如果所有这些都可以用一种语言(Python)完成,那么消除shell和awk编程就会消除两种编程语言,从而允许某人专注于任务的价值生成部分。
醇>
-
分叉原始shell的子进程。这最终将成为b。
-
构建一个os管道。 (不是Python subprocess.PIPE)但是调用
a
,它返回两个通过公共缓冲区连接的新文件描述符。此时,进程具有来自其父级的stdin,stdout,stderr,以及将<!>“a的stdout <!>”的文件。和<!>“; b的stdin <!>”; -
分叉一个孩子。孩子用新的标准替换它的标准输出。执行
b
流程。 -
b子关闭用新b的标准输入替换其标准输入。执行
a | b | c
流程。 -
b孩子等待a完成。
-
父母正在等待b完成。
醇>
底线:awk无法添加重要价值。在这种情况下,awk是净成本;它增加了足够的复杂性,有必要提出这个问题。删除awk将是净收益。
补充工具栏为什么构建管道(a | b
)非常困难。
当shell遇到os.pipe()
时,必须执行以下操作。
我认为上面的内容可以递归地用于生成a | (b | c)
,但你必须隐式地将长管道括起来,将它们视为os.exec()
。
由于Python有os.fork()
,sys.stdin
和sys.stdout
,并且你可以替换subprocess.Popen
和<=>,所以有一种方法可以在纯Python中完成上述操作。实际上,您可以使用<=>和<=>来计算一些快捷方式。
但是,将该操作委托给shell更容易。
其他提示
import subprocess
some_string = b'input_data'
sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in,
stdin=subprocess.PIPE).communicate(some_string)
模拟shell管道:
from subprocess import check_call
check_call('echo "input data" | a | b > outfile.txt', shell=True)
不调用shell(参见 17.1.4.2。更换shell管道):
#!/usr/bin/env python
from subprocess import Popen, PIPE
a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
with a.stdout, open("outfile.txt", "wb") as outfile:
b = Popen(["b"], stdin=a.stdout, stdout=outfile)
a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already
plumbum
提供了一些语法糖:
#!/usr/bin/env python
from plumbum.cmd import a, b # magic
(a << "input data" | b > "outfile.txt")()
类似于:
#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt
是:
#!/usr/bin/env python
from plumbum.cmd import awk, sort
(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()
http://www.python.org/doc/2.5.2/lib/node535.html 很好地介绍了这一点。你有什么不明白的一部分吗?
你的程序非常相似,但是第二个Popen
会有stdout =到一个文件,你不需要它的输出.communicate()
。
受到@Cristian 的回答的启发。我遇到了同样的问题,但使用了不同的命令。因此,我将我的测试示例放在其中,我相信这可能会有所帮助:
grep_proc = subprocess.Popen(["grep", "rabbitmq"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()
这是经过测试的。
已经做了什么
- 宣布懒惰
grep
使用管道中的标准输入执行。该命令将在ps
当管道将充满标准输出时命令执行ps
. - 调用主命令
ps
将 stdout 定向到所使用的管道grep
命令。 - Grep 进行通信以从管道获取标准输出。
我喜欢这种方式,因为它是自然的烟斗概念,轻轻地包裹着 subprocess
接口。
接受的答案是回避这个问题。 这是一个链接多个进程输出的片段: 请注意,它还会打印(某种程度上)等效的shell命令,以便您可以运行它并确保输出正确。
#!/usr/bin/env python3
from subprocess import Popen, PIPE
# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")
p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)
print("Output from last process : " + (p3.communicate()[0]).decode())
# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)
编辑 pipes
可在Windows上使用,但至关重要的是,在Windows上似乎没有工作。见下面的评论。
Python标准库现在包含用于处理此问题的subprocess
模块:
https://docs.python.org/2/library/pipes.html , https://docs.python.org/3.4/library/pipes.html
我不确定这个模块已经存在了多长时间,但这种方法似乎比使用<=>进行捣乱要简单得多。
以前的答案错过了重要的一点。 更换shell管道基本上是正确的,正如所指出的那样通过geocar。 几乎足以在管道的最后一个元素上运行communicate
。
剩下的问题是将输入数据传递给管道。对于多个子进程,最后一个元素上的简单communicate(input_data)
不起作用 - 它会永久挂起。您需要手动创建管道和子项,如下所示:
import os
import subprocess
input = """\
input data
more input
""" * 10
rd, wr = os.pipe()
if os.fork() != 0: # parent
os.close(wr)
else: # child
os.close(rd)
os.write(wr, input)
os.close(wr)
exit()
p_awk = subprocess.Popen(["awk", "{ print $2; }"],
stdin=rd,
stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"],
stdin=p_awk.stdout,
stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())
现在,子进程通过管道提供输入,父进程调用communic(),它按预期工作。使用这种方法,您可以创建任意长管道,而无需使用<!>将部分工作委托给shell <!>“;不幸的是,子流程文档没有提到这一点。
有些方法可以在没有管道的情况下实现相同的效果:
from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)
现在使用stdin=tf
代表p_awk
。这是你喜欢的品味问题。
由于信号处理不同,上述仍然不是100%等同于bash管道。如果添加另一个截断sort
输出的管道元素,可以看到这个,例如head -n 10
。使用上面的代码,stderr
将打印<!>“Broken pipe <!>”;错误消息到stdout
。在shell中运行相同的管道时,您将看不到此消息。 (这是唯一的区别,但Popen
中的结果是相同的)。原因似乎是python的SIG_IGN
为SIGPIPE
设置SIG_DFL
,而shell将它留在<=>,而<=>的信号处理在这两种情况下是不同的。
对我来说,下面的方法是最干净,最容易阅读的
from subprocess import Popen, PIPE
def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
with open(output_filename, 'wb') as out_f:
p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
p1.communicate(input=bytes(input_s))
p1.wait()
p2.stdin.close()
p2.wait()
可以像这样调用:
string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')