Frage

Ich bin mit dem Subprozess Modul einen Subprozess zu starten und eine Verbindung zu ihm Ausgang ist Strom (stdout). Ich möchte in der Lage, nicht-blockierend auszuführen auf seine stdout liest. Gibt es eine Möglichkeit .readline nicht-blockierenden oder zu überprüfen, um, wenn Daten auf dem Strom ist, bevor ich .readline aufrufen? Ich möchte dieses tragbare oder zumindest arbeiten unter Windows und Linux sein.

Hier ist, wie ich es jetzt tun (es ist auf dem .readline blockiert, wenn keine Daten verfügbar ist):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
War es hilfreich?

Lösung

fcntl , select , asyncproc wird in diesem Fall nicht helfen.

Ein zuverlässiger Weg, um einen Strom zu lesen, ohne unabhängig vom Betriebssystem blockiert ist Queue.get_nowait() :

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

Andere Tipps

Ich habe oft ein ähnliches Problem; Python Programme, die ich häufig schreiben müssen die Möglichkeit haben, einige primäre Funktionalität auszuführen, während gleichzeitig eine Benutzereingabe über die Befehlszeile zu akzeptieren (stdin). Einfach die Benutzereingaben in einem anderen Thread Handhabung Funktionalität setzen löst nicht das Problem, weil readline() blockiert und kein Timeout hat. Wenn die primäre Funktionalität abgeschlossen ist, und es gibt keine Notwendigkeit mehr für weitere Benutzereingabe warten, ich mein Programm normalerweise beenden will, aber es kann nicht, weil readline() noch in dem anderen Thread für eine Schlange warten zu blockieren. Eine Lösung, die ich für dieses Problem gefunden habe, ist stdin eine nicht-blockierende Datei mit dem Fcntl Modul zu machen:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

Meiner Meinung nach ist dies ein bisschen sauberer als die Auswahl oder Signalmodulen, dieses Problem zu lösen, aber dann wieder es funktioniert nur auf UNIX ...

Python 3.4 stellt neue rel="noreferrer"> - asyncio Modul .

Der Ansatz ist ähnlich href="https://stackoverflow.com/a/5750194/4279"> twisted-basierte Antwort von @Bryan Wird

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Siehe "Subprocess" in der Dokumentation .

Es ist ein High-Level-Schnittstelle asyncio.create_subprocess_exec() die Process Objekte Das erlaubt eine Linie asynchroniosly mit StreamReader.readline() Koroutine (Mit async / await Python 3.5 + Syntax ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() führt die folgenden Aufgaben:

  • starten subprocess, leiten seine stdout an einem Rohr
  • lesen Sie eine Linie von subprocess' stdout asynchron
  • töten subprocess
  • warten, bis es zu beenden

Jeder Schritt von timeout Sekunden begrenzt werden könnte, falls erforderlich.

Versuchen Sie die asyncproc Modul. Zum Beispiel:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Das Modul kümmert sich um alle Einfädeln vorgeschlagen, wie durch S.Lott.

Sie können dies tun, wirklich leicht in Verdrehte . Je nach Ihrer bestehenden Code-Basis, könnte dies nicht so einfach zu bedienen sein, aber wenn Sie eine verdrehte Anwendung erstellen, dann Dinge wie diese fast trivial geworden. Sie erstellen eine ProcessProtocol Klasse, und die outReceived() Methode außer Kraft setzen. Twisted (in Abhängigkeit von dem Reaktor verwendet wird) ist in der Regel nur eine große Schleife mit select() Rückrufe installierten Daten aus verschiedenen Datei-Deskriptoren zu handhaben (oft Netzwerk-Sockets). So ist die outReceived() Methode einfach ist, einen Rückruf zu installieren, um von STDOUT kommenden Datenhandling. Ein einfaches Beispiel dieses Verhalten zeigt, ist wie folgt:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Die Twisted-Dokumentation einige gute Informationen zu diesem Thema hat.

Wenn Sie Ihre gesamte Anwendung um Verdrehte zu bauen, macht es die asynchrone Kommunikation mit anderen Prozessen, lokal oder entfernt, wirklich elegant wie diese. Auf der anderen Seite, wenn Ihr Programm nicht auf Twisted gebaut wird, wird dies sein werde nicht wirklich hilfreich. Hoffentlich kann dies hilfreich sein, um andere Leser, auch wenn es nicht für Ihre Anwendung ist.

Mit auswählen und lesen (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Für Readline- () - wie:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

Eine Lösung ist, um einen anderen Prozess zu machen beim Lesen des Prozesses durchzuführen, oder einen Thread des Prozesses mit einem Timeout machen.

Hier ist die Gewindeausführung einer Timeout-Funktion:

http://code.activestate.com/recipes/473878/

Allerdings müssen Sie die stdout lesen, wie es in kommt? Eine andere Lösung kann die Ausgabe in eine Datei auszugeben und für den Prozess warten zu beenden mit p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

Disclaimer: Dies funktioniert nur für Tornado

Sie können dies tun, indem Sie die Einstellung fd nonblocking zu sein, und dann verwenden ioloop Rückrufe zu registrieren. Ich habe diese tornado_subprocess genannt in einem Ei verpackt und Sie können es über PyPI installieren:

easy_install tornado_subprocess

Jetzt können Sie etwas tun:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

Sie können es auch verwenden, um mit einem Request

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

Bestehende Lösungen funktionierten nicht für mich (Details siehe unten). Was war schließlich arbeitete Readline- mit read (1) (basierend auf diese Antwort ) zu implementieren. Letzteres nicht blockiert:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Warum bestehende Lösungen nicht funktioniert hat:

  1. Lösungen, die Readline- (einschließlich der Queue basierte) erfordern immer blockieren. Es ist schwer (unmöglich?), Um den Thread zu töten, die Readline- ausführt. Es wird nur getötet, als der Prozess, erstellt sie beendet wird, aber nicht, wenn die Ausgänge produzierenden Prozess abgebrochen wird.
  2. Mischen Low-Level-Fcntl mit High-Level-Readline- Anrufen möglicherweise nicht richtig als anonnn hat darauf hingewiesen.
  3. Mit select.poll () ist ordentlich, aber nicht funktioniert unter Windows nach Python-Dokumentation.
  4. Bibliotheken von Drittanbietern verwenden scheint viel des Guten für diese Aufgabe und fügt zusätzliche Abhängigkeiten.

Diese Version von nicht-blockierenden Schreib- nicht erfordert spezielle Module und out-of-the-box auf Mehrheit der Linux-Distributionen arbeiten.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

ich hinzufügen, um dieses Problem etwas subprocess.Popen stdout zu lesen. Hier ist meine nicht blockierende Lese Lösung:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

Hier ist mein Code verwendet, um jede Ausgabe von subprocess zu fangen so schnell wie möglich, einschließlich Teillinien. Es pumpt zur gleichen Zeit und stdout und stderr in fast richtigen Reihenfolge.

Geprüft und arbeitete korrekt auf Python 2.7 Linux und Windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

Das Hinzufügen dieser Antwort hier, da es die Fähigkeit bietet nicht blockierende Rohre auf Windows und Unix zu setzen.

Alle ctypes Details sind dank @ Techtonik Antwort .

Es ist eine leicht modifizierte Version sowohl auf Unix- und Windows-Systemen verwendet werden.

  • Python3 kompatibel (nur geringfügige Änderung erforderlich) .
  • Ist mit Posix-Version und definiert Ausnahme für entweder zu verwenden.

Auf diese Weise kann die gleiche Funktion und Ausnahme für Unix und Windows-Code verwenden kann.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

unvollständige Daten lesen zu vermeiden, landete ich meinen eigenen Readline- Generator zu schreiben (die das Byte-String für jede Zeile zurückzugibt).

Es ist ein Generator, so kann man zum Beispiel ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

Ich habe das Problem des ursprünglichen Fragesteller, aber wollte nicht Threads aufzurufen. Ich mischte Jesse-Lösung mit direkter read () aus dem Rohr, und mein eigener Puffer-Handler für Zeile liest (aber mein Teilprozess - ping - immer voll Linien

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

Der Beobachter ist

def watch(f, *other):
print 'reading',f.read()
return True

Und das Hauptprogramm setzt einen Ping und ruft dann gobject Mail-Schleife.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Jede andere Arbeit Rückrufe in gobject angebracht ist.

warum stört Thread & Warteschlange? Im Gegensatz zu Readline- (), wird nicht BufferedReader.read1 () Block für \ r \ n wartet, gibt sie so schnell wie möglich, ob es eine Ausgabe in der nächsten ist.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

In meinem Fall musste ich ein Logging-Modul, das die Ausgabe von den Anwendungen im Hintergrund fängt und erweitert es (Hinzufügen von Zeitmarken, Farben usw.).

beendete ich mit einem Hintergrundthread auf, dass die tatsächlichen I / O der Fall ist. Folgende Code ist nur für POSIX-Plattformen. Ich zog nicht wesentliche Teile.

Wenn jemand geht, dieses Tier zu verwenden, für lange Läufe betrachten offene Deskriptoren zu verwalten. In meinem Fall war es kein großes Problem.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

Arbeiten von J. F. Sebastian Antwort, und einigen anderen Quellen, ich habe einen einfachen subprocess Manager zusammen. Es bietet die Anfrage Lese non-blocking, sowie mehrere Prozesse parallel laufen. Es macht keinen OS-spezifischen Aufruf verwenden (die ich bewusst bin) und sollte daher überall arbeiten.

Es ist erhältlich von pypi, so dass nur pip install shelljob. Wenden Sie sich an den Projektseite für Beispiele und vollständige Dokumente.

EDIT: Diese Implementierung immer noch blockiert. Verwenden J.F.Sebastian beantworten statt.

habe ich versucht, die Top-Antwort , aber das zusätzliche Risiko und Wartung von Thread-Code war besorgniserregend.

Blick durch den io Modul ( und begrenzt auf 2,6), fand ich BufferedReader. Das ist mein gewindelosen, nicht-blockierende Lösung.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

Ich stolperte vor kurzem auf auf dem gleichen Problem Ich brauche eine Zeile zu einem Zeitpunkt aus dem Strom (tail in subprocess) zu lesen in nicht-blockierenden Modus Ich wollte nächste Probleme vermeiden: nicht cpu zu verbrennen, nicht lesen Stream um ein Byte (wie Readline- tat), etc

Hier ist meine Implementierung https://gist.github.com/grubberr/5501e1a9760c3eab5e0a es keine Fenster (Umfrage) unterstützen, nicht umgehen EOF, aber es funktioniert gut für mich

Dies ist ein Beispiel der interaktiven Befehl in subprocess zu laufen, und die stdout ist interaktiv durch Pseudo-Terminal. Sie können verweisen: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

Mein Problem ist ein bisschen anders als ich Fehler- und Standardausgaben von einem laufenden Prozess, aber letztlich das gleiche sammeln wollte, da ich die Ausgabe in einem Widget als erzeugt wird.

machen wollte

Ich wollte nicht zu viele der vorgeschlagenen Abhilfen greifen Queues oder zusätzliche Threads verwenden, da sie nicht notwendig sein sollte, eine solche gemeinsame Aufgabe wie läuft ein anderes Skript ausführen und seine Ausgabe zu sammeln.

Nach den vorgeschlagenen Lösungen und Python-Dokumentation zu lesen aufgelöst ich mein Problem mit der Umsetzung unten. Ja, es funktioniert nur für POSIX als ich den select Funktionsaufruf bin mit.

Ich bin damit einverstanden, dass die Dokumente sind verwirrend und die Umsetzung ist umständlich für eine solche gemeinsame scripting Aufgabe. Ich glaube, dass ältere Versionen von Python haben unterschiedliche Vorgaben für Popen und verschiedene Erklärungen, so dass eine Menge Verwirrung geschaffen. Dies scheint für beide Python gut zu funktionieren 2.7.12 und 3.5.2.

Der Schlüssel bufsize=1 für Zeile Pufferung eingestellt war und dann universal_newlines=True als Textdatei anstelle eines binären zu verarbeiten, die den Standard zu werden scheint, wenn bufsize=1 Einstellung.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG und VERBOSE sind einfach Makros, die Ausgabe auf dem Terminal drucken.

Diese Lösung ist IMHO 99,99% effektiv, da es nach wie vor die Blockierung readline Funktion verwendet, so nehmen wir an, der Teilprozess schön und gibt komplette Linien sind.

Ich begrüße Feedback die Lösung zu verbessern, wie ich zu Python noch neu bin.

Diese Lösung nutzt die select Modul aus einem IO-Strom „alle verfügbaren Daten lesen“. Diese Funktion blockiert zunächst bis Daten verfügbar sind, aber dann liest nur die Daten, die zur Verfügung steht und nicht blockiert weiter.

In Anbetracht der Tatsache, dass es das select Modul verwendet, das funktioniert nur auf Unix.

Der Code ist voll PEP8-kompatibel ist.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

Ich sah sich auch das Problem von Jesse beschrieben und löste es mit den "Select": Bradley , Andy und andere haben aber in einem Sperrmodus eine Besetzt Schleife zu vermeiden. Es verwendet ein Dummy-Rohr als Fälschung stdin. Die ausgewählten Blöcke und warten entweder stdin oder das Rohr bereit zu sein. Wenn eine Taste gedrückt wird stdin deblockiert die Auswahl und der Schlüsselwert kann mit Lese (1) abgerufen werden. Wenn ein anderer Thread auf das Rohr schreibt dann entblockiert das Rohr den auswählen und kann als eine Anzeige genommen werden, dass die Notwendigkeit für stdin vorbei ist. Hier finden Sie einige Referenzcode:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

Die Dinge sind viel besser in der modernen Python.

Hier ist ein einfaches Kind-Programm "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

Und ein Programm, um mit ihm interagieren:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

Das ausdruckt:

b'hello bob\n'
b'hello alice\n'
Hinweis

, dass die tatsächlichen Muster, die auch all bisherigen Antworten von fast ist, sowohl hier als auch in verwandten Fragen, sind das Kind stdout Dateideskriptor nicht blockierend zu setzen und sie dann in einer Art select-Schleife abfragen. In diesen Tagen natürlich, dass Schleife durch asyncio vorgesehen ist.

Hier ist ein Modul, das nicht-blockierend unterstützt liest und schreibt Hintergrund in Python:

https://pypi.python.org/pypi/python-nonblock

Stellt eine Funktion,

nonblock_read, die Daten aus dem Stream, falls vorhanden lesen, andernfalls eine leere Zeichenfolge zurück (oder None, wenn der Strom auf der anderen Seite geschlossen ist und alle möglichen Daten gelesen wurden)

Sie können auch das Python-Modul subprocess2 betrachten,

https://pypi.python.org/pypi/python-subprocess2

das fügt das Subprozess-Modul. So auf dem Objekt von „subprocess.Popen“ zurückgegeben wird, eine zusätzliche Methode, runInBackground hinzugefügt. Dies startet einen Thread und gibt ein Objekt, das automatisch als Material aufgefüllt wird, wird auf stdout / stderr geschrieben, ohne die Haupt-Thread zu blockieren.

Genießen Sie!

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top