كيف يمكنني استخدام subprocess.Popen لتوصيل عمليات متعددة عبر الأنابيب؟

StackOverflow https://stackoverflow.com/questions/295459

  •  08-07-2019
  •  | 
  •  

سؤال

كيف يمكنني تنفيذ أمر shell التالي باستخدام Python subprocess وحدة؟

echo "input data" | awk -f script.awk | sort > outfile.txt

ستأتي بيانات الإدخال من سلسلة، لذا لا أحتاجها فعليًا echo.لقد وصلت إلى هذا الحد، هل يمكن لأي أحد أن يشرح لي كيف يمكنني توصيله عبر الأنابيب sort أيضاً؟

p_awk = subprocess.Popen(["awk","-f","script.awk"],
                          stdin=subprocess.PIPE,
                          stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )

تحديث:لاحظ أنه على الرغم من أن الإجابة المقبولة أدناه لا تجيب فعليًا على السؤال كما هو مطروح، أعتقد أن S.Lott على حق ومن الأفضل تجنب الاضطرار إلى حل هذه المشكلة في المقام الأول!

هل كانت مفيدة؟

المحلول

ستكون أكثر سعادة قليلاً مع ما يلي.

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
    stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data\n" )

تفويض جزء من العمل إلى الصدفة.دعها تربط عمليتين بخط أنابيب.

سيكون من دواعي سرورك إعادة كتابة "script.awk" في لغة Python، والقضاء على awk وخط الأنابيب.

يحرر.بعض الأسباب التي تجعلنا نقترح أن awk لا يساعد.

[هناك أسباب كثيرة جدًا للرد عبر التعليقات.]

  1. يقوم Awk بإضافة خطوة ليس لها قيمة كبيرة.لا يوجد شيء فريد في معالجة awk لا تتعامل معه Python.

  2. قد يؤدي التدفق من AWK إلى الفرز، بالنسبة لمجموعات كبيرة من البيانات، إلى تحسين وقت المعالجة المنقضي.بالنسبة لمجموعات قصيرة من البيانات، ليس لها فائدة كبيرة.قياس سريع لل awk >file ; sort file و awk | sort سوف تكشف عن التزامن يساعد.مع الفرز، نادرًا ما يكون ذلك مفيدًا لأن الفرز ليس مرشحًا لمرة واحدة.

  3. إن بساطة معالجة "Python للفرز" (بدلاً من "Python to awk tosort") تمنع طرح الأسئلة الدقيقة هنا.

  4. Python - على الرغم من أنها أكثر كلامًا من awk - فهي أيضًا صريحة حيث تحتوي awk على قواعد ضمنية معينة غير شفافة للمبتدئين، ومربكة لغير المتخصصين.

  5. يضيف Awk (مثل برنامج Shell النصي نفسه) لغة برمجة أخرى.إذا كان من الممكن القيام بكل هذا بلغة واحدة (بايثون)، فإن إزالة لغة البرمجة shell و awk ستؤدي إلى إلغاء لغتي برمجة، مما يسمح لشخص ما بالتركيز على الأجزاء المنتجة للقيمة من المهمة.

الحد الأدنى:awk لا يمكن أن يضيف قيمة كبيرة.في هذه الحالة، awk هي تكلفة صافية؛لقد أضاف تعقيدًا كافيًا لدرجة أنه كان من الضروري طرح هذا السؤال.ستكون إزالة awk مكسبًا صافيًا.

الشريط الجانبي لماذا بناء خط أنابيب (a | b) صعب جدا.

عندما تواجه القشرة a | b عليها أن تفعل ما يلي.

  1. شوكة عملية فرعية من القشرة الأصلية.سيصبح هذا في النهاية ب.

  2. بناء أنبوب نظام التشغيل.(ليست عملية فرعية لـ Python.PIPE) ولكن اتصل os.pipe() الذي يقوم بإرجاع واصفي ملف جديدين متصلين عبر مخزن مؤقت مشترك.في هذه المرحلة، تحتوي العملية على stdin وstdout وstderr من أصلها، بالإضافة إلى ملف سيكون "a's stdout" و"b's stdin".

  3. شوكة طفل.يستبدل الطفل stdout الخاص به بـ a's stdout الجديد.تنفيذ a عملية.

  4. يستبدل الطفل b الذي يغلق stdin الخاص به بـ stdin b الجديد.تنفيذ b عملية.

  5. ينتظر الطفل b حتى يكتمل.

  6. الوالد ينتظر اكتمال b.

أعتقد أنه يمكن استخدام ما ورد أعلاه بشكل متكرر لتفرخ a | b | c, ، ولكن عليك أن تضع بين قوسين ضمنيًا خطوط الأنابيب الطويلة، وأن تعاملها كما لو كانت كذلك a | (b | c).

منذ بايثون لديها os.pipe(), os.exec() و os.fork(), ، ويمكنك استبداله sys.stdin و sys.stdout, ، هناك طريقة للقيام بما ورد أعلاه بلغة بايثون الخالصة.في الواقع، قد تتمكن من العمل على بعض الاختصارات باستخدام os.pipe() و subprocess.Popen.

ومع ذلك، فمن الأسهل تفويض هذه العملية إلى الصدفة.

نصائح أخرى

import subprocess

some_string = b'input_data'

sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in, 
                 stdin=subprocess.PIPE).communicate(some_string)

لمحاكاة خط أنابيب قذيفة:

from subprocess import check_call

check_call('echo "input data" | a | b > outfile.txt', shell=True)

ودون الاستناد قذيفة (انظر 17.1.4.2. استبدال قذيفة خط أنابيب ):

#!/usr/bin/env python
from subprocess import Popen, PIPE

a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
    with a.stdout, open("outfile.txt", "wb") as outfile:
        b = Popen(["b"], stdin=a.stdout, stdout=outfile)
    a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already

plumbum يوفر بعض السكر في بناء الجملة:

#!/usr/bin/env python
from plumbum.cmd import a, b # magic

(a << "input data" | b > "outfile.txt")()

ووالتناظرية من:

#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt

وهو:

#!/usr/bin/env python
from plumbum.cmd import awk, sort

(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()

<وأ href = "http://web.archive.org/web/20081222144252/http://www.python.org/doc/2.5.2/lib/node535.html" يختلط = "نوفولو noreferrer" > http://www.python.org/doc/2.5.2/lib/node535.html تغطية هذا جيد. هناك جزء من هذا أنك لم تفهم؟

وبرنامجك ستكون مشابهة جدا، إلا أن Popen الثانية يكون المعياري = إلى ملف، وأنك لن تحتاج إلى إخراج .communicate() لها.

مستوحاة من إجابة @ كريستيان.لقد واجهت نفس المشكلة، ولكن مع أمر مختلف.لذلك أضع المثال الذي تم اختباره، والذي أعتقد أنه قد يكون مفيدًا:

grep_proc = subprocess.Popen(["grep", "rabbitmq"],
                             stdin=subprocess.PIPE, 
                             stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()

تم اختبار هذا.

ما تم تنفيذه

  • أعلن كسول grep التنفيذ مع stdin من الأنابيب.سيتم تنفيذ هذا الأمر في ps تنفيذ الأمر عندما يتم ملء الأنبوب بـ stdout ps.
  • يسمى الأمر الأساسي ps مع stdout الموجه إلى الأنبوب الذي يستخدمه grep يأمر.
  • تواصل Grep للحصول على stdout من الأنبوب.

أنا أحب هذه الطريقة لأنها عبارة عن أنبوب طبيعي ملفوف بلطف subprocess واجهات.

والجواب المقبول ويتهرب من القضية. هنا هو مقتطف سلاسل الناتج من عمليات متعددة: لاحظ أنه يطبع أيضا (إلى حد ما) أي ما يعادل الأمر shell حتى تتمكن من تشغيلها والتأكد من أن الإخراج هو الصحيح.

#!/usr/bin/env python3

from subprocess import Popen, PIPE

# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)

print("Output from last process : " + (p3.communicate()[0]).decode())

# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)

يحرر: pipes متاح على نظام التشغيل Windows، ولكن الأهم من ذلك أنه لا يبدو كذلك في الواقع عمل على ويندوز.انظر التعليقات أدناه.

تتضمن مكتبة Python القياسية الآن pipes وحدة للتعامل مع هذا:

https://docs.python.org/2/library/pipes.html, https://docs.python.org/3.4/library/pipes.html

لست متأكدًا من المدة التي استغرقتها هذه الوحدة، ولكن يبدو أن هذا النهج أبسط إلى حد كبير من العبث به subprocess.

والأجوبة السابقة غاب عن نقطة مهمة. استبدال خط أنابيب قذيفة هو الصحيح في الأساس، كما أشار إلى بواسطة geocar. فمن <م> تقريبا كافية لتشغيل communicate على العنصر الأخير من الأنابيب.

والمشكلة المتبقية يتم تمرير البيانات على إدخال لخط الانابيب. مع فرعية متعددة، وcommunicate(input_data) بسيطة على العنصر الأخير لا يعمل - أنها معلقة إلى الأبد. تحتاج إلى إنشاء خط أنابيب وطفل يدويا مثل هذا:

import os
import subprocess

input = """\
input data
more input
""" * 10

rd, wr = os.pipe()
if os.fork() != 0: # parent
    os.close(wr)
else:              # child
    os.close(rd)
    os.write(wr, input)
    os.close(wr)
    exit()

p_awk = subprocess.Popen(["awk", "{ print $2; }"],
                         stdin=rd,
                         stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"], 
                          stdin=p_awk.stdout,
                          stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())

والآن الطفل يوفر المدخلات عن طريق الأنابيب، والتواصل المكالمات الأم ()، والذي يعمل كما هو متوقع. مع هذا النهج، يمكنك إنشاء خطوط أنابيب طويلة التعسفية دون اللجوء إلى "تفويض جزء من العمل إلى قذيفة". للأسف الوثائق فرعي أو جانبي لا أذكر هذا.

وهناك طرق لتحقيق التأثير نفسه دون الأنابيب:

from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)

والآن استخدام stdin=tf لp_awk. إنها مسألة ذوق ما كنت تفضل ذلك.

ما سبق هو ما زال ليس 100٪ أي ما يعادل سحق خطوط أنابيب لمعالجة إشارة مختلفة. يمكنك ان ترى هذا إذا قمت بإضافة عنصر أنبوب آخر باقتطاع إخراج sort، على سبيل المثال head -n 10. برمز أعلاه، سوف sort طباعة رسالة خطأ "ماسورة مكسورة" لstderr. لن تشاهد هذه الرسالة عند تشغيل نفس خط أنابيب في وعاء. (وهذا هو الفرق الوحيد على الرغم من النتيجة في stdout هو نفسه). السبب يبدو أن هذه المجموعات Popen الثعبان في SIG_IGN لSIGPIPE، في حين أن قذيفة يترك ذلك في SIG_DFL، ومعالجة إشارة sort يختلف في هاتين الحالتين.

وبالنسبة لي، فإن النهج أدناه هو أنظف وأسهل للقراءة

from subprocess import Popen, PIPE

def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
    with open(output_filename, 'wb') as out_f:
        p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
        p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
        p1.communicate(input=bytes(input_s))
        p1.wait()
        p2.stdin.close()
        p2.wait()

والذي يمكن أن يسمى مثل ذلك:

string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top