Nicht blockierende lesen auf einem subprocess.PIPE in Python
-
22-08-2019 - |
Frage
Ich bin mit dem Subprozess Modul einen Subprozess zu starten und eine Verbindung zu ihm Ausgang ist Strom (stdout). Ich möchte in der Lage, nicht-blockierend auszuführen auf seine stdout liest. Gibt es eine Möglichkeit .readline nicht-blockierenden oder zu überprüfen, um, wenn Daten auf dem Strom ist, bevor ich .readline
aufrufen? Ich möchte dieses tragbare oder zumindest arbeiten unter Windows und Linux sein.
Hier ist, wie ich es jetzt tun (es ist auf dem .readline
blockiert, wenn keine Daten verfügbar ist):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Lösung
fcntl
, select
, asyncproc
wird in diesem Fall nicht helfen.
Ein zuverlässiger Weg, um einen Strom zu lesen, ohne unabhängig vom Betriebssystem blockiert ist Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Andere Tipps
Ich habe oft ein ähnliches Problem; Python Programme, die ich häufig schreiben müssen die Möglichkeit haben, einige primäre Funktionalität auszuführen, während gleichzeitig eine Benutzereingabe über die Befehlszeile zu akzeptieren (stdin). Einfach die Benutzereingaben in einem anderen Thread Handhabung Funktionalität setzen löst nicht das Problem, weil readline()
blockiert und kein Timeout hat. Wenn die primäre Funktionalität abgeschlossen ist, und es gibt keine Notwendigkeit mehr für weitere Benutzereingabe warten, ich mein Programm normalerweise beenden will, aber es kann nicht, weil readline()
noch in dem anderen Thread für eine Schlange warten zu blockieren. Eine Lösung, die ich für dieses Problem gefunden habe, ist stdin eine nicht-blockierende Datei mit dem Fcntl Modul zu machen:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
Meiner Meinung nach ist dies ein bisschen sauberer als die Auswahl oder Signalmodulen, dieses Problem zu lösen, aber dann wieder es funktioniert nur auf UNIX ...
Python 3.4 stellt neue rel="noreferrer"> - asyncio
Modul .
Der Ansatz ist ähnlich href="https://stackoverflow.com/a/5750194/4279"> Siehe "Subprocess" in der Dokumentation . Es ist ein High-Level-Schnittstelle Jeder Schritt von timeout Sekunden begrenzt werden könnte, falls erforderlich. twisted
-basierte Antwort von @Bryan Wird
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
asyncio.create_subprocess_exec()
die Process
Objekte Das erlaubt eine Linie asynchroniosly mit StreamReader.readline()
Koroutine
(Mit async
/ await
Python 3.5 + Syntax ): #!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
führt die folgenden Aufgaben:
Versuchen Sie die asyncproc Modul. Zum Beispiel:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
Das Modul kümmert sich um alle Einfädeln vorgeschlagen, wie durch S.Lott.
Sie können dies tun, wirklich leicht in Verdrehte . Je nach Ihrer bestehenden Code-Basis, könnte dies nicht so einfach zu bedienen sein, aber wenn Sie eine verdrehte Anwendung erstellen, dann Dinge wie diese fast trivial geworden. Sie erstellen eine ProcessProtocol
Klasse, und die outReceived()
Methode außer Kraft setzen. Twisted (in Abhängigkeit von dem Reaktor verwendet wird) ist in der Regel nur eine große Schleife mit select()
Rückrufe installierten Daten aus verschiedenen Datei-Deskriptoren zu handhaben (oft Netzwerk-Sockets). So ist die outReceived()
Methode einfach ist, einen Rückruf zu installieren, um von STDOUT
kommenden Datenhandling. Ein einfaches Beispiel dieses Verhalten zeigt, ist wie folgt:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
Die Twisted-Dokumentation einige gute Informationen zu diesem Thema hat.
Wenn Sie Ihre gesamte Anwendung um Verdrehte zu bauen, macht es die asynchrone Kommunikation mit anderen Prozessen, lokal oder entfernt, wirklich elegant wie diese. Auf der anderen Seite, wenn Ihr Programm nicht auf Twisted gebaut wird, wird dies sein werde nicht wirklich hilfreich. Hoffentlich kann dies hilfreich sein, um andere Leser, auch wenn es nicht für Ihre Anwendung ist.
Mit auswählen und lesen (1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Für Readline- () - wie:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
Eine Lösung ist, um einen anderen Prozess zu machen beim Lesen des Prozesses durchzuführen, oder einen Thread des Prozesses mit einem Timeout machen.
Hier ist die Gewindeausführung einer Timeout-Funktion:
http://code.activestate.com/recipes/473878/
Allerdings müssen Sie die stdout lesen, wie es in kommt? Eine andere Lösung kann die Ausgabe in eine Datei auszugeben und für den Prozess warten zu beenden mit p.wait () .
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
Disclaimer: Dies funktioniert nur für Tornado
Sie können dies tun, indem Sie die Einstellung fd nonblocking zu sein, und dann verwenden ioloop Rückrufe zu registrieren. Ich habe diese tornado_subprocess genannt in einem Ei verpackt und Sie können es über PyPI installieren:
easy_install tornado_subprocess
Jetzt können Sie etwas tun:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
Sie können es auch verwenden, um mit einem Request
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Bestehende Lösungen funktionierten nicht für mich (Details siehe unten). Was war schließlich arbeitete Readline- mit read (1) (basierend auf diese Antwort ) zu implementieren. Letzteres nicht blockiert:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Warum bestehende Lösungen nicht funktioniert hat:
- Lösungen, die Readline- (einschließlich der Queue basierte) erfordern immer blockieren. Es ist schwer (unmöglich?), Um den Thread zu töten, die Readline- ausführt. Es wird nur getötet, als der Prozess, erstellt sie beendet wird, aber nicht, wenn die Ausgänge produzierenden Prozess abgebrochen wird.
- Mischen Low-Level-Fcntl mit High-Level-Readline- Anrufen möglicherweise nicht richtig als anonnn hat darauf hingewiesen.
- Mit select.poll () ist ordentlich, aber nicht funktioniert unter Windows nach Python-Dokumentation.
- Bibliotheken von Drittanbietern verwenden scheint viel des Guten für diese Aufgabe und fügt zusätzliche Abhängigkeiten.
Diese Version von nicht-blockierenden Schreib- nicht erfordert spezielle Module und out-of-the-box auf Mehrheit der Linux-Distributionen arbeiten.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
ich hinzufügen, um dieses Problem etwas subprocess.Popen stdout zu lesen. Hier ist meine nicht blockierende Lese Lösung:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Hier ist mein Code verwendet, um jede Ausgabe von subprocess zu fangen so schnell wie möglich, einschließlich Teillinien. Es pumpt zur gleichen Zeit und stdout und stderr in fast richtigen Reihenfolge.
Geprüft und arbeitete korrekt auf Python 2.7 Linux und Windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
Das Hinzufügen dieser Antwort hier, da es die Fähigkeit bietet nicht blockierende Rohre auf Windows und Unix zu setzen.
Alle ctypes
Details sind dank @ Techtonik Antwort .
Es ist eine leicht modifizierte Version sowohl auf Unix- und Windows-Systemen verwendet werden.
- Python3 kompatibel (nur geringfügige Änderung erforderlich) .
- Ist mit Posix-Version und definiert Ausnahme für entweder zu verwenden.
Auf diese Weise kann die gleiche Funktion und Ausnahme für Unix und Windows-Code verwenden kann.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
unvollständige Daten lesen zu vermeiden, landete ich meinen eigenen Readline- Generator zu schreiben (die das Byte-String für jede Zeile zurückzugibt).
Es ist ein Generator, so kann man zum Beispiel ...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
Ich habe das Problem des ursprünglichen Fragesteller, aber wollte nicht Threads aufzurufen. Ich mischte Jesse-Lösung mit direkter read () aus dem Rohr, und mein eigener Puffer-Handler für Zeile liest (aber mein Teilprozess - ping - immer voll Linien Der Beobachter ist Und das Hauptprogramm setzt einen Ping und ruft dann gobject Mail-Schleife. Jede andere Arbeit Rückrufe in gobject angebracht ist. def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
def watch(f, *other):
print 'reading',f.read()
return True
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
Allerdings sind Sie fast immer glücklicher mit separaten Fäden. Man muss eine Blockierung der stdin lesen, ein anderer tut, wo immer es ist, dass Sie nicht blockiert werden sollen.
warum stört Thread & Warteschlange? Im Gegensatz zu Readline- (), wird nicht BufferedReader.read1 () Block für \ r \ n wartet, gibt sie so schnell wie möglich, ob es eine Ausgabe in der nächsten ist.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
In meinem Fall musste ich ein Logging-Modul, das die Ausgabe von den Anwendungen im Hintergrund fängt und erweitert es (Hinzufügen von Zeitmarken, Farben usw.).
beendete ich mit einem Hintergrundthread auf, dass die tatsächlichen I / O der Fall ist. Folgende Code ist nur für POSIX-Plattformen. Ich zog nicht wesentliche Teile.
Wenn jemand geht, dieses Tier zu verwenden, für lange Läufe betrachten offene Deskriptoren zu verwalten. In meinem Fall war es kein großes Problem.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
Ich habe eine Bibliothek erstellt basierend auf J. F. Sebastian Lösung . Sie können es verwenden.
Arbeiten von J. F. Sebastian Antwort, und einigen anderen Quellen, ich habe einen einfachen subprocess Manager zusammen. Es bietet die Anfrage Lese non-blocking, sowie mehrere Prozesse parallel laufen. Es macht keinen OS-spezifischen Aufruf verwenden (die ich bewusst bin) und sollte daher überall arbeiten.
Es ist erhältlich von pypi, so dass nur pip install shelljob
. Wenden Sie sich an den Projektseite für Beispiele und vollständige Dokumente.
EDIT: Diese Implementierung immer noch blockiert. Verwenden J.F.Sebastian beantworten statt.
habe ich versucht, die Top-Antwort , aber das zusätzliche Risiko und Wartung von Thread-Code war besorgniserregend.
Blick durch den io Modul ( und begrenzt auf 2,6), fand ich BufferedReader. Das ist mein gewindelosen, nicht-blockierende Lösung.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
Ich stolperte vor kurzem auf auf dem gleichen Problem Ich brauche eine Zeile zu einem Zeitpunkt aus dem Strom (tail in subprocess) zu lesen in nicht-blockierenden Modus Ich wollte nächste Probleme vermeiden: nicht cpu zu verbrennen, nicht lesen Stream um ein Byte (wie Readline- tat), etc
Hier ist meine Implementierung https://gist.github.com/grubberr/5501e1a9760c3eab5e0a es keine Fenster (Umfrage) unterstützen, nicht umgehen EOF, aber es funktioniert gut für mich
Dies ist ein Beispiel der interaktiven Befehl in subprocess zu laufen, und die stdout ist interaktiv durch Pseudo-Terminal. Sie können verweisen: https://stackoverflow.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Mein Problem ist ein bisschen anders als ich Fehler- und Standardausgaben von einem laufenden Prozess, aber letztlich das gleiche sammeln wollte, da ich die Ausgabe in einem Widget als erzeugt wird.
machen wollteIch wollte nicht zu viele der vorgeschlagenen Abhilfen greifen Queues oder zusätzliche Threads verwenden, da sie nicht notwendig sein sollte, eine solche gemeinsame Aufgabe wie läuft ein anderes Skript ausführen und seine Ausgabe zu sammeln.
Nach den vorgeschlagenen Lösungen und Python-Dokumentation zu lesen aufgelöst ich mein Problem mit der Umsetzung unten. Ja, es funktioniert nur für POSIX als ich den select
Funktionsaufruf bin mit.
Ich bin damit einverstanden, dass die Dokumente sind verwirrend und die Umsetzung ist umständlich für eine solche gemeinsame scripting Aufgabe. Ich glaube, dass ältere Versionen von Python haben unterschiedliche Vorgaben für Popen
und verschiedene Erklärungen, so dass eine Menge Verwirrung geschaffen. Dies scheint für beide Python gut zu funktionieren 2.7.12 und 3.5.2.
Der Schlüssel bufsize=1
für Zeile Pufferung eingestellt war und dann universal_newlines=True
als Textdatei anstelle eines binären zu verarbeiten, die den Standard zu werden scheint, wenn bufsize=1
Einstellung.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, DEBUG und VERBOSE sind einfach Makros, die Ausgabe auf dem Terminal drucken.
Diese Lösung ist IMHO 99,99% effektiv, da es nach wie vor die Blockierung readline
Funktion verwendet, so nehmen wir an, der Teilprozess schön und gibt komplette Linien sind.
Ich begrüße Feedback die Lösung zu verbessern, wie ich zu Python noch neu bin.
Diese Lösung nutzt die select
Modul aus einem IO-Strom „alle verfügbaren Daten lesen“. Diese Funktion blockiert zunächst bis Daten verfügbar sind, aber dann liest nur die Daten, die zur Verfügung steht und nicht blockiert weiter.
In Anbetracht der Tatsache, dass es das select
Modul verwendet, das funktioniert nur auf Unix.
Der Code ist voll PEP8-kompatibel ist.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
Ich sah sich auch das Problem von Jesse beschrieben und löste es mit den "Select": Bradley , Andy und andere haben aber in einem Sperrmodus eine Besetzt Schleife zu vermeiden. Es verwendet ein Dummy-Rohr als Fälschung stdin. Die ausgewählten Blöcke und warten entweder stdin oder das Rohr bereit zu sein. Wenn eine Taste gedrückt wird stdin deblockiert die Auswahl und der Schlüsselwert kann mit Lese (1) abgerufen werden. Wenn ein anderer Thread auf das Rohr schreibt dann entblockiert das Rohr den auswählen und kann als eine Anzeige genommen werden, dass die Notwendigkeit für stdin vorbei ist. Hier finden Sie einige Referenzcode:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
Die Dinge sind viel besser in der modernen Python.
Hier ist ein einfaches Kind-Programm "hello.py":
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
Und ein Programm, um mit ihm interagieren:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
Das ausdruckt:
b'hello bob\n'
b'hello alice\n'
Hinweis , dass die tatsächlichen Muster, die auch all bisherigen Antworten von fast ist, sowohl hier als auch in verwandten Fragen, sind das Kind stdout Dateideskriptor nicht blockierend zu setzen und sie dann in einer Art select-Schleife abfragen. In diesen Tagen natürlich, dass Schleife durch asyncio vorgesehen ist.
Hier ist ein Modul, das nicht-blockierend unterstützt liest und schreibt Hintergrund in Python:
https://pypi.python.org/pypi/python-nonblock
Stellt eine Funktion,
nonblock_read, die Daten aus dem Stream, falls vorhanden lesen, andernfalls eine leere Zeichenfolge zurück (oder None, wenn der Strom auf der anderen Seite geschlossen ist und alle möglichen Daten gelesen wurden)
Sie können auch das Python-Modul subprocess2 betrachten,
https://pypi.python.org/pypi/python-subprocess2
das fügt das Subprozess-Modul. So auf dem Objekt von „subprocess.Popen“ zurückgegeben wird, eine zusätzliche Methode, runInBackground hinzugefügt. Dies startet einen Thread und gibt ein Objekt, das automatisch als Material aufgefüllt wird, wird auf stdout / stderr geschrieben, ohne die Haupt-Thread zu blockieren.
Genießen Sie!