Non blocca la lettura di un sottoprocesso.TUBO in python
-
22-08-2019 - |
Domanda
Sto utilizzando il sottoprocesso modulo per avviare un processo secondario e collegare ad esso il flusso di output (stdout).Voglio essere in grado di eseguire non-blocking legge sul suo stdout.C'è un modo per fare .readline non-blocking o per controllare se ci sono dati sul flusso prima di richiamare .readline
?Mi piacerebbe questo, per essere portatile, o almeno lavorare sotto Windows che sotto Linux.
ecco come fare per ora (È il blocco sul .readline
se non ci sono dati disponibili):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Soluzione
fcntl
, select
, asyncproc
non aiuterà in questo caso.
Un modo affidabile per leggere un flusso senza bloccare indipendentemente dal sistema operativo è usare Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Altri suggerimenti
Più volte ho avuto un problema simile; programmi Python Scrivo spesso necessario avere la capacità di eseguire alcune funzionalità primaria e contemporaneamente accettare input dell'utente dalla riga di comando (stdin). Semplicemente mettendo la funzionalità movimentazione input dell'utente in un altro filo non risolve il problema perché blocchi readline()
e non ha timeout. Se la funzionalità principale è completo e non v'è più alcuna necessità di attendere ulteriori input dell'utente Io di solito voglio che il mio programma per uscire, ma non può perché readline()
è ancora bloccando in altro thread in attesa di una linea. Una soluzione che ho trovato a questo problema è quello di rendere un file stdin non bloccante utilizzando il modulo fcntl:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
A mio parere questo è un po 'più pulita rispetto all'utilizzo dei moduli di selezione o di segnale per risolvere questo problema, ma poi di nuovo funziona solo su UNIX ...
Python 3.4 introduce la nuova provvisorio API per asincrona IO - modulo asyncio
.
L'approccio è simile a risposta twisted
-based @Bryan Ward - definire un protocollo ei suoi metodi sono chiamati non appena i dati sono pronti:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
"sottoprocesso" nella documentazione .
C'è un'interfaccia asyncio.create_subprocess_exec()
di alto livello che restituisce Process
oggetti che permette di leggere una riga asynchroniosly utilizzando StreamReader.readline()
coroutine
(Con async
/ await
Python 3.5+ sintassi ):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
esegue le seguenti attività:
- avviare sottoprocesso, riorientare la sua stdout ad un tubo
- leggere una linea da sottoprocesso stdout in modo asincrono
- uccidere sottoprocesso
- attendere che per uscire
Ogni passo potrebbe essere limitata da timeout secondi se necessario.
Prova il asyncproc modulo. Ad esempio:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
Il modulo si prende cura di tutto il threading come suggerito da S. Lott.
È possibile farlo molto facilmente in ritorto . A seconda della vostra base di codice esistente, questo potrebbe non essere così facile da usare, ma se si sta costruendo un'applicazione contorto, quindi le cose come questa diventato quasi banale. Si crea una classe ProcessProtocol
, e l'override del metodo outReceived()
. Ritorto (a seconda del reattore utilizzato) è di solito solo un ciclo grande select()
con richiamate installati per gestire i dati provenienti da diversi descrittori di file (spesso prese di rete). Quindi il metodo outReceived()
è sufficiente installare un callback per la gestione di dati provenienti da STDOUT
. Un semplice esempio che dimostra questo comportamento è la seguente:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
Il documentazione ritorto ha alcune buone informazioni su questo.
Se si costruisce l'intera applicazione intorno Contorto, rende la comunicazione asincrona con altri processi, locali o remoti, davvero eleganti come questo. D'altra parte, se il programma non è costruito sulla base di Twisted, questo non è davvero essere che utile. Speriamo che questo può essere utile per gli altri lettori, anche se non è applicabile per una particolare applicazione.
Usa selezionare e leggere (1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Per readline () - come:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
Una soluzione è quella di fare un altro processo per eseguire la lettura del processo, o fare un thread del processo con un timeout.
Ecco la versione filettata di una funzione di timeout:
http://code.activestate.com/recipes/473878/
Tuttavia, non è necessario leggere lo stdout come è in arrivo? Un'altra soluzione potrebbe essere quella di scaricare l'output in un file e attendere il termine del processo utilizzando p.wait () .
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
Esonero di responsabilità: questo funziona solo per i tornado
È possibile farlo impostando il fd di essere non bloccante e quindi utilizzare ioloop per registrare callback. Ho confezionato questo in un uovo chiamato tornado_subprocess e è possibile installarlo tramite PyPI:
easy_install tornado_subprocess
ora si può fare qualcosa di simile:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
è possibile anche usarlo con un RequestHandler
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Le soluzioni esistenti non hanno funzionato per me (dettagli sotto). Ciò che alla fine ha lavorato è stato quello di implementare readline usando lettura (1) (sulla base di questa risposta ). Quest'ultimo non blocca:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Perché le soluzioni esistenti non hanno funzionato:
- Soluzioni che richiedono readline (compresi quelli a base di coda) sempre bloccare. E 'difficile (impossibile?) Per uccidere il thread che esegue readline. E 'solo viene ucciso quando il processo che lo ha creato finisce, ma non quando il processo di output che producono viene ucciso.
- Mixing fcntl a basso livello con chiamate readline di alto livello potrebbe non funzionare correttamente come anonnn ha sottolineato.
- Uso select.poll () è pulito, ma non funziona su Windows in base alla documentazione Python.
- Utilizzo di librerie di terze parti sembra eccessivo per questo compito e aggiunge ulteriori dipendenze.
Questa versione di non-blocking leggere non richiedere moduli speciali e lavorerà out-of-the-box sulla maggior parte delle distribuzioni Linux.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
aggiungo questo problema di leggere alcuni subprocess.Popen stdout. Ecco la mia soluzione non bloccante lettura:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Ecco il mio codice, utilizzato per la cattura di ogni uscita da sottoprocesso ASAP, comprese le linee parziali. Esso pompe allo stesso tempo e stdout e stderr per quasi corretto.
Testato e correttamente lavorato su Python 2.7 Linux e Windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
L'aggiunta di questa risposta qui in quanto fornisce la capacità di impostare i tubi non bloccanti su Windows e Unix.
Tutti i dettagli sono ctypes
grazie a @ di techtonik risposta .
C'è una versione leggermente modificata per essere utilizzato sia su sistemi Unix e Windows.
- python3 compatibile (solo piccola modifica necessaria) .
- include la versione POSIX, e definisce eccezioni da utilizzare per entrambi.
In questo modo è possibile utilizzare la stessa funzione e l'eccezione per il codice Unix e Windows.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
Per evitare la lettura di dati incompleti, ho finito per scrivere il mio generatore di readline (che restituisce la stringa di byte per ogni linea).
E 'un generatore in modo da poter, ad esempio ...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
Non ho il problema del interrogante originale, ma non ho voluto richiamare le discussioni. Ho mescolato la soluzione di Jesse con una lettura diretta () dal tubo, e il mio proprio buffer-handler per riga si legge (tuttavia, il mio sub-processo - ping - sempre scritto linee complete L'osservatore è E il programma principale imposta un ping e quindi chiama loop di posta gobject. Qualsiasi altra opera è collegata al callback in gobject. def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
def watch(f, *other):
print 'reading',f.read()
return True
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
Il selezionare modulo consente di determinare dove il prossimo ingresso utile è.
Tuttavia, si è quasi sempre più felice con thread separati. Uno non un blocco lettura stdin, un altro fa dovunque sia che non si desidera bloccato.
Perché preoccuparsi filo e la fila? a differenza di readline (), BufferedReader.read1 () wont blocco in attesa di \ r \ n, restituisce ASAP se c'è qualche uscita in arrivo.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
Nel mio caso avevo bisogno di un modulo di registrazione che cattura l'uscita dalle applicazioni in background e aumenta esso (l'aggiunta di time-stamp, colori, ecc.).
Ho finito con un thread in background che fa l'attuale di I / O. A seguito di codice è solo per le piattaforme POSIX. Ho messo a nudo le parti non essenziali.
Se qualcuno sta per utilizzare questa bestia per long run considerano la gestione di descrittori aperti. Nel mio caso non è stato un grosso problema.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
Ho creato una libreria basata su J. soluzione di F. Sebastian. Si può usare.
Lavorare da risposta di J.F. Sebastian, e diverse altre fonti, ho messo insieme un semplice gestore di sottoprocesso. Esso fornisce la lettura non bloccante richiesta, così come l'esecuzione di più processi in parallelo. Non usa alcuna chiamata specifico sistema operativo (che io sappia) e quindi dovrebbe lavorare ovunque.
E 'disponibile da PyPI, quindi basta pip install shelljob
. Fare riferimento alla pagina per esempi e documentazione completa.
EDIT: Questa implementazione ancora blocchi. rispondere .
ho provato il risposta superiore , ma il rischio e la manutenzione del codice thread aggiuntivo era preoccupante.
Guardando attraverso il io modulo ( e di essere limitata a 2,6), ho trovato BufferedReader. Questo è il mio Threadless, una soluzione non-blocking.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
Recentemente ho incappato nello stesso problema Ho bisogno di leggere una riga alla volta dal flusso (coda eseguito in sottoprocesso) in modo non bloccante Volevo evitare successivi problemi: non bruciare cpu, non leggono flusso da un byte (come readline ha fatto), etc
Ecco la mia implementazione https://gist.github.com/grubberr/5501e1a9760c3eab5e0a che non supportano le finestre (sondaggio), non gestiscono EOF, ma per me funziona bene
Questo è un esempio di eseguire il comando interattiva in sottoprocesso, e stdout è interattiva utilizzando pseudo terminale. Si può fare riferimento a: https://stackoverflow.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Il mio problema è un po ' diversa, come ho voluto raccogliere sia stdout e stderr da un processo in esecuzione, ma in definitiva la stessa da quando ho voluto rendere l'output in un widget come la sua generato.
Non volevo ricorrere a molte delle proposte di soluzioni alternative utilizzando Code o ulteriori Thread come questi non dovrebbe essere necessario eseguire un compito comune, come l'esecuzione di un altro script e raccogliendo la sua uscita.
Dopo aver letto la proposta di soluzioni e python-docs ho risolto il mio problema con l'implementazione di seguito.Sì, ma funziona solo per POSIX come sto usando il select
chiamata di funzione.
Sono d'accordo che i documenti sono confuso e l'implementazione è a disagio per il comune di scripting attività.Credo che le vecchie versioni di python hanno diversi valori predefiniti per Popen
e diverse spiegazioni così che ha creato un sacco di confusione.Questo sembra funzionare bene sia per Python 2.7.12 e 3.5.2.
La chiave è stato quello di impostare bufsize=1
per la linea di buffering e poi universal_newlines=True
a processo come un file di testo invece di un binario che sembra diventare l'impostazione predefinita quando bufsize=1
.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
Di ERRORE, eseguire il DEBUG e DETTAGLIATO, sono semplicemente le macro stampare l'output del terminale.
Questa soluzione è IMHO il 99,99% di efficacia in quanto utilizza ancora il blocco readline
funzione, quindi si suppone che il processo di sub è bello e uscite di linee complete.
Accolgo con favore il feedback per migliorare la soluzione, io sono ancora nuovo a Python.
Questa soluzione utilizza il modulo select
di "leggere tutti i dati disponibili" da un flusso di IO. Questa funzione blocca inizialmente fino sono disponibili i dati, ma poi legge solo i dati che è disponibile e non bloccare ulteriormente.
Dato il fatto che utilizza il modulo select
, questo funziona solo su Unix.
Il codice è completamente PEP8-compliant.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
Ho anche affrontato il problema descritto da Jesse e risolto utilizzando "seleziona" il Bradley , Andy altri hanno fatto, ma in una modalità di blocco per evitare un ciclo occupato. Esso utilizza un tubo fittizio come stdin falso. I blocchi di selezione e attendere che sia stdin o il tubo per essere pronti. stdin quando viene premuto un tasto sblocca la selezione e il valore di chiave può essere recuperato con lettura (1). Quando un thread diverso scrive al tubo poi il tubo sblocca il prescelto e può essere considerato come un'indicazione che la necessità di stdin è finita. Ecco il codice di riferimento:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
Le cose vanno molto meglio in Python moderna.
Ecco un semplice programma di bambino "hello.py":
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
E un programma di interagire con esso:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
che stampa:
b'hello bob\n'
b'hello alice\n'
Si noti che l'andamento reale, che è anche da quasi tutte le risposte precedenti, sia qui che in questioni correlate, è quello di impostare descrittore di file stdout del bambino a non bloccante e poi sondaggio è in una sorta di selezionare loop. In questi giorni, ovviamente, quel ciclo è fornito da asyncio.
Qui è un modulo che supporta non bloccante legge e sfondo scrive in Python:
https://pypi.python.org/pypi/python-nonblock
fornisce una funzione,
nonblock_read che leggere dati dal flusso, se disponibile, altrimenti restituisce una stringa vuota (o None se il flusso è chiuso sul lato opposto e tutti i dati possibili sono stati letti)
Si può anche prendere in considerazione il modulo python-subprocess2,
https://pypi.python.org/pypi/python-subprocess2
che aggiunge al modulo sottoprocesso. Così via l'oggetto restituito da "subprocess.Popen" viene aggiunto un ulteriore metodo, runInBackground. Questo avvia un thread e restituisce un oggetto che verrà automaticamente popolato come roba viene scritto output / error, senza bloccare il thread principale.
Enjoy!