Как использовать subprocess.Popen для соединения нескольких процессов с помощью каналов?
-
08-07-2019 - |
Вопрос
Как мне выполнить следующую команду оболочки, используя Python subprocess
модуль?
echo "input data" | awk -f script.awk | sort > outfile.txt
Входные данные будут поступать из строки, поэтому мне на самом деле не нужно echo
.Я зашел так далеко, может кто-нибудь объяснить, как мне заставить его пройти через трубку? sort
слишком?
p_awk = subprocess.Popen(["awk","-f","script.awk"],
stdin=subprocess.PIPE,
stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )
ОБНОВЛЯТЬ:Обратите внимание: хотя принятый ниже ответ на самом деле не отвечает на поставленный вопрос, я считаю, что С.Лотт прав, и лучше вообще избегать решения этой проблемы!
Решение
Вы были бы немного более довольны следующим.
import subprocess
awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )
Делегируйте часть работы оболочке.Пусть он соединит два процесса конвейером.
Вы были бы намного счастливее переписать «script.awk» на Python, исключив awk и конвейер.
Редактировать.Некоторые причины полагать, что awk не помогает.
[Слишком много причин отвечать в комментариях.]
Awk добавляет шаг, не имеющий существенной ценности.В обработке awk нет ничего уникального, с чем не справляется Python.
Конвейерная передача от awk к сортировке для больших наборов данных может сократить время обработки.Для коротких наборов данных это не имеет существенного преимущества.Быстрое измерение
awk >file ; sort file
иawk | sort
поможет выявить параллелизм.При сортировке это помогает редко, поскольку сортировка не является однократным фильтром.Простота обработки «Python для сортировки» (вместо «Python для awk для сортировки») не позволяет задавать здесь именно такие вопросы.
Python, хотя и более многословен, чем awk, но также является явным, поскольку awk имеет определенные неявные правила, которые непонятны новичкам и сбивают с толку неспециалистов.
Awk (как и сам сценарий оболочки) добавляет еще один язык программирования.Если все это можно сделать на одном языке (Python), отказ от оболочки и программирования на awk исключает использование двух языков программирования, позволяя кому-то сосредоточиться на тех частях задачи, которые приносят пользу.
Нижняя граница:awk не может принести существенной пользы.В данном случае awk — это чистые затраты;это добавило достаточно сложности, и пришлось задать этот вопрос.Удаление awk принесет чистую выгоду.
Боковая панель Зачем строить трубопровод(a | b
) это так сложно.
Когда оболочка сталкивается с a | b
он должен сделать следующее.
Создайте дочерний процесс исходной оболочки.В конечном итоге это станет b.
Создайте канал ОС.(не подпроцесс Python.PIPE), но вызовите
os.pipe()
который возвращает два новых файловых дескриптора, связанных через общий буфер.На этом этапе процесс имеет стандартный ввод, стандартный вывод, стандартный вывод от своего родителя, а также файл, который будет «стандартным выводом а» и «стандартным вводом b».Раскошелитесь на ребенка.Дочерний элемент заменяет свой стандартный вывод новым стандартным выводом a.Исполнить
a
процесс.Дочерний элемент b закрывается, заменяя свой стандартный ввод новым стандартным вводом b.Исполнить
b
процесс.Ребенок b ждет завершения a.
Родитель ждет завершения b.
Я думаю, что вышеизложенное можно использовать рекурсивно для создания a | b | c
, но вам придется неявно заключать в круглые скобки длинные конвейеры, рассматривая их так, как будто они a | (b | c)
.
Поскольку Python имеет os.pipe()
, os.exec()
и os.fork()
, и вы можете заменить sys.stdin
и sys.stdout
, есть способ сделать это на чистом Python.Действительно, вы можете найти некоторые ярлыки, используя os.pipe()
и subprocess.Popen
.
Однако проще делегировать эту операцию оболочке.
Другие советы
import subprocess
some_string = b'input_data'
sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in,
stdin=subprocess.PIPE).communicate(some_string)
Чтобы эмулировать конвейер оболочки:
from subprocess import check_call
check_call('echo "input data" | a | b > outfile.txt', shell=True)
без вызова оболочки (см. 17.1.4.2.Замена скорлупового трубопровода):
#!/usr/bin/env python
from subprocess import Popen, PIPE
a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
with a.stdout, open("outfile.txt", "wb") as outfile:
b = Popen(["b"], stdin=a.stdout, stdout=outfile)
a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already
plumbum
предоставляет некоторый синтаксический сахар:
#!/usr/bin/env python
from plumbum.cmd import a, b # magic
(a << "input data" | b > "outfile.txt")()
Аналог:
#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt
является:
#!/usr/bin/env python
from plumbum.cmd import awk, sort
(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()
http://www.python.org/doc/2.5.2/lib/node535.html достаточно хорошо это описали. Есть ли какая-то часть этого, которую вы не поняли?
Ваша программа будет очень похожа, но у второго Popen
будет stdout = в файл, и вам не понадобится вывод его .communicate ()
. р>
Вдохновленный ответом @Cristian.Я столкнулся с той же проблемой, но с другой командой.Итак, я привожу свой проверенный пример, который, как я считаю, может быть полезен:
grep_proc = subprocess.Popen(["grep", "rabbitmq"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()
Это проверено.
Что было сделано
- Объявлен ленивым
grep
выполнение с помощью стандартного ввода из канала.Эта команда будет выполнена вps
выполнение команды, когда канал будет заполнен стандартным выводомps
. - Вызывается основная команда
ps
со стандартным выводом, направленным на канал, используемыйgrep
команда. - Grep связался, чтобы получить стандартный вывод из канала.
Мне нравится этот способ, потому что это естественная трубочная концепция, аккуратно обернутая subprocess
интерфейсы.
Принятый ответ обходит стороной проблему. Вот фрагмент кода, который объединяет результаты нескольких процессов: Обратите внимание, что он также печатает (несколько) эквивалентную команду оболочки, чтобы вы могли запустить ее и убедиться в правильности вывода.
#!/usr/bin/env python3
from subprocess import Popen, PIPE
# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")
p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)
print("Output from last process : " + (p3.communicate()[0]).decode())
# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)
РЕДАКТИРОВАТЬ: pipe
доступен в Windows, но, что особенно важно, на самом деле не работает в Windows. Смотрите комментарии ниже.
Стандартная библиотека Python теперь включает модуль pipe
для этого:
https://docs.python.org/2/library/pipes.html , https://docs.python.org/3.4/library/pipes.html р>
Я не уверен, как долго работает этот модуль, но этот подход, по-видимому, намного проще, чем копаться в подпроцессе
.
В предыдущих ответах упущен важный момент. Замена конвейера оболочки в основном правильная, как указано Geocar. почти достаточно для запуска общаться
с последним элементом канала.
Остается проблема с передачей входных данных в конвейер. С несколькими подпроцессами простое общаться (input_data)
с последним элементом не работает - оно зависает навсегда. Вам нужно вручную создать конвейер и дочерний элемент, например:
import os
import subprocess
input = """\
input data
more input
""" * 10
rd, wr = os.pipe()
if os.fork() != 0: # parent
os.close(wr)
else: # child
os.close(rd)
os.write(wr, input)
os.close(wr)
exit()
p_awk = subprocess.Popen(["awk", "{ print $2; }"],
stdin=rd,
stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"],
stdin=p_awk.stdout,
stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())
Теперь дочерний элемент предоставляет входные данные через канал, а родительские вызовы взаимодействуют (), что работает, как и ожидалось. При таком подходе можно создавать произвольные длинные конвейеры, не прибегая к «делегированию части работы оболочке». К сожалению, документация подпроцесса не упоминает об этом.
Есть способы достичь того же эффекта без труб:
from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)
Теперь используйте stdin = tf
для p_awk
. Это вопрос вкуса, что вы предпочитаете.
Вышесказанное все еще не на 100% эквивалентно bash-конвейерам, потому что обработка сигналов отличается. Это можно увидеть, если добавить еще один элемент канала, который усекает выходные данные sort
, например head -n 10
. С указанным выше кодом sort
напечатает " Сломанная труба " сообщение об ошибке stderr
. Вы не увидите это сообщение при запуске того же конвейера в оболочке. (Это единственное отличие, результат в stdout
тот же). Кажется, причина в том, что Popen
в python устанавливает SIG_IGN
для SIGPIPE
, тогда как оболочка оставляет его в SIG_DFL
, и Обработка сигналов sort
в этих двух случаях различна.
Для меня приведенный ниже подход является наиболее чистым и легким для чтения
from subprocess import Popen, PIPE
def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
with open(output_filename, 'wb') as out_f:
p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
p1.communicate(input=bytes(input_s))
p1.wait()
p2.stdin.close()
p2.wait()
который можно назвать так:
string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')