Sin bloqueo leer en un subproceso.TUBERÍA en python
-
22-08-2019 - |
Pregunta
Estoy usando el módulo subprocess para iniciar un subproceso y conectarse a la secuencia de salida (stdout).Quiero ser capaz de ejecutar el bloqueo no lee en su stdout.Es allí una manera de hacer .readline sin bloqueo o para comprobar si hay datos en la secuencia antes de invoco .readline
?Me gustaría que este para ser portátil o al menos trabajar en Windows y Linux.
aquí es cómo yo lo hago por ahora (Es el bloqueo en el .readline
si no hay datos disponibles):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Solución
fcntl
, select
, asyncproc
no ayudará en este caso.
Una forma confiable de leer una secuencia sin bloquear independientemente del sistema operativo es utilizar Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Otros consejos
A menudo he tenido un problema similar; programas de Python que escribo con frecuencia es necesario tener la capacidad de ejecutar algunas funciones primaria, mientras que al mismo tiempo aceptar la entrada del usuario desde la línea de comandos (entrada estándar). Basta con poner la funcionalidad de manejo de la entrada del usuario en otro hilo no resuelve el problema, ya que los bloques readline()
y no tiene tiempo de espera. Si la funcionalidad principal está completo y ya no hay ninguna necesidad de esperar más entradas de usuario que normalmente quiero que mi programa para salir, pero no puede porque readline()
se sigue bloqueando en el otro hilo a la espera de una línea. Una solución que he encontrado a este problema es hacer un archivo de entrada estándar sin bloqueo mediante el módulo fcntl:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
En mi opinión, esto es un poco más limpio que el uso de los módulos de selección o de señal para resolver este problema pero de nuevo, sólo funciona en UNIX ...
Python 3.4 presenta nuevo provisional API para asíncrono IO - módulo asyncio
.
El enfoque es similar a de respuesta basado en twisted
por @Bryan Ward, - definir un protocolo y sus métodos se llaman tan pronto como los datos están listos:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
"subproceso" en la documentación .
Hay una asyncio.create_subprocess_exec()
interfaz de alto nivel que devuelve Process
objetos que permite leer una línea asynchroniosly usando StreamReader.readline()
corrutina
(Con async
/ await
Python 3.5+ sintaxis ):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
realiza las siguientes tareas:
- iniciar el subproceso, redirigir su salida estándar a un tubo
- leer una línea del subproceso stdout asíncrona
- matar subproceso
- esperar a que salga
Cada paso podría estar limitada por segundo de tiempo de espera si es necesario.
asyncproc módulo. Por ejemplo:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
El módulo se encarga de todo el roscado según lo sugerido por S. Lott.
Esto se puede hacer muy fácilmente en Twisted . Dependiendo de su base de código existente, esto podría no ser tan fácil de usar, pero si usted está construyendo una aplicación torcida, entonces este tipo de cosas convertido casi trivial. Se crea una clase ProcessProtocol
, y reemplaza el método outReceived()
. Twisted (dependiendo del reactor utilizado) es por lo general sólo un bucle grande select()
con devoluciones de llamada instalados para manejar los datos de diferentes descriptores de fichero (con frecuencia conectores de red). Por lo que el método outReceived()
es simplemente instalando una devolución de llamada para el manejo de los datos procedentes de STDOUT
. Un simple ejemplo que demuestra este comportamiento es como sigue:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
El documentación Twisted tiene una buena información sobre esto.
Si se construye toda su aplicación alrededor de trenzado, que hace que la comunicación asíncrona con otros procesos, locales o remotas, muy elegantes como este. Por otro lado, si el programa no se construye en la parte superior de trenzado, esto no es realmente va a ser de mucha ayuda. Espero que esto puede ser útil para otros lectores, aunque no es aplicable para su aplicación en particular.
Uso seleccionar y leer (1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Para readline () - como:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
Una solución es hacer otro proceso para realizar su lectura del proceso, o hacer un hilo del proceso con un tiempo de espera.
Aquí está la versión roscada de una función de tiempo de espera:
http://code.activestate.com/recipes/473878/
Sin embargo, no es necesario leer la salida estándar, ya que viene en? Otra solución puede ser para volcar el resultado en un archivo y esperar a que finalice el proceso usando p.wait () .
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
exención de responsabilidad: esto sólo funciona para un tornado
Usted puede hacer esto mediante el establecimiento de la fd ser sin bloqueo y luego usar ioloop para registrar las devoluciones de llamada. He empaquetado esto en un huevo llamado tornado_subprocess y puede instalarlo a través de PyPI:
easy_install tornado_subprocess
Ahora usted puede hacer algo como esto:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
También se puede utilizar con un RequestHandler
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
Las soluciones existentes no me funciona (la información a continuación).Lo que finalmente se trabajó fue implementar readline usando read(1) (basado en esta respuesta).El último no cuadra:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
¿Por qué las soluciones existentes no funciona:
- Las soluciones que requieren readline (incluyendo la Cola de base) siempre en bloque.Es difícil (¿imposible?) para matar el subproceso que ejecuta readline.Sólo se mató cuando el proceso que lo creó acabados, pero no cuando la salida-proceso de producción es asesinado.
- Mezcla de bajo nivel fcntl con un alto nivel de readline las llamadas pueden no funcionar correctamente como anonnn ha señalado.
- El uso de select.poll() es limpio, pero no funciona en el Windows de acuerdo a python docs.
- El uso de librerías de terceros parece excesivo para esta tarea y agrega dependencias adicionales.
Esta versión de la no-bloqueo de lectura no requerir módulos especiales y trabajará fuera de la caja en la mayoría de las distribuciones de Linux.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
agrego este problema para leer algunos stdout subprocess.Popen. Aquí está mi solución no lectura de bloqueo:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Aquí está mi código, para la captura de cada salida del subproceso lo antes posible, incluyendo las líneas parciales. Bombea al mismo tiempo y stdout y stderr en orden casi correcta.
Probado y correctamente trabajado en Python 2.7 Linux y Windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
La adición de esta respuesta aquí, ya que proporciona capacidad de fijar tuberías no bloqueantes en Windows y Unix.
Todos los detalles ctypes
son gracias a @ techtonik de respuesta .
Hay una versión ligeramente modificados para ser utilizados tanto en los sistemas Unix y Windows.
- compatibles python3 (sólo cambios menores necesario) .
- Incluye versión posix, y define excepción de usar para cualquiera.
De esta manera se puede utilizar la misma función y una excepción para el código de Unix y Windows.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
Para evitar la lectura de datos incompletos, terminé de escribir mi propio generador de readline (que devuelve la cadena de bytes para cada línea).
Es un generador para que pueda, por ejemplo ...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
Tengo un problema del que pregunta original, pero no deseo invocar hilos. He mezclado solución de Jesse con una lectura directa () de la tubería, y mi propio buffer-manejador de línea se lee (sin embargo, mi sub-proceso - de ping - siempre escribió líneas completas El observador es Y el programa principal establece un ping y luego llama bucle de correo gobject. Cualquier otro trabajo está unido a devoluciones de llamada en gobject. def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
def watch(f, *other):
print 'reading',f.read()
return True
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
El seleccione módulo ayuda a determinar dónde la siguiente entrada útil es.
Sin embargo, usted es casi siempre más feliz con hilos separados. Uno hace un bloqueo de lectura de la entrada estándar, otro hace dondequiera que se encuentre que no desea bloqueado.
¿Por qué molestarse hilo y cola? a diferencia de readline (), BufferedReader.read1 () suele bloquear la espera de \ r \ n, devuelve ASAP si hay cualquier salida entrando.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
En mi caso necesitaba un módulo de registro que capta la salida de las aplicaciones en segundo plano y lo multiplica (la adición de marcas de tiempo, colores, etc.).
Terminé con un subproceso de fondo que hace la E / S real. Siguiente código es sólo para plataformas POSIX. Me desnudé partes no esenciales.
Si alguien va a utilizar esta bestia de carreras largas consideran la gestión de descriptores abiertos. En mi caso no fue un gran problema.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
He creado una biblioteca basada en J. solución de F. Sebastián. Se puede utilizar.
El trabajo de la respuesta de J. F. Sebastián, y varias otras fuentes, He creado un simple gestor de subproceso. Proporciona la solicitud de lectura sin bloqueo, así como el funcionamiento de varios procesos en paralelo. No utiliza ninguna llamada específica del SO (que yo sepa) y por lo tanto debería funcionar en cualquier lugar.
Está disponible en PyPI, por lo que sólo pip install shelljob
. Consulte el página de ejemplos y documentos completos.
EDIT: Esta aplicación aún bloques. responder lugar.
probé el superior respuesta, pero el riesgo y el mantenimiento del código de hilo adicional era preocupante.
Mirando a través del módulo io ( y se limita a 2,6), encontré BufferedReader. Esta es mi solución sin rosca, sin bloqueo.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
Hace poco me encontré con el mismo problema Tengo que leer una línea a la vez de la corriente (la cola se ejecuta en el subproceso) en el modo no bloqueante Quería evitar los problemas siguientes: no quemar la CPU, no lea corriente por un byte (como lo hizo readline), etc.
Aquí está mi aplicación https://gist.github.com/grubberr/5501e1a9760c3eab5e0a que no son compatibles con las ventanas (sondeo), no se ocupan de EOF, pero a mí me funciona bien
Este es un ejemplo para ejecutar comandos interactivo en el sub-proceso, y la salida estándar es interactiva mediante el uso de pseudo-terminal. Se puede hacer referencia a: https://stackoverflow.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
Mi problema es un poco diferente, ya que quería recoger stdout y stderr de un proceso en ejecución, pero en última instancia la misma ya que quería hacer la salida en un widget como su generado.
No quería que recurrir a muchas de las soluciones propuestas usando una cola o hilos adicionales, ya que no deberían ser necesarios para llevar a cabo una tarea tan común como la ejecución de otro guión y la recogida de su salida.
Después de leer las soluciones propuestas y documentación de Python resolví mi problema con la aplicación a continuación. Sí sólo funciona para POSIX como yo estoy usando la llamada de función select
.
Estoy de acuerdo que los documentos son confusas y la puesta en práctica es difícil para una tarea de secuencias de comandos comunes. Creo que las versiones anteriores de Python tienen diferentes valores predeterminados para Popen
y diferentes explicaciones por lo que crearon mucha confusión. Esto parece funcionar bien tanto para Python 3.5.2 y 2.7.12.
La clave era establecer bufsize=1
para el búfer de línea y luego universal_newlines=True
para procesar como un archivo de texto en lugar de un binario que parece ser el valor por defecto cuando se configura bufsize=1
.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR, depurar y VERBOSE son simplemente macros que imprimen la salida de la terminal.
Esta solución es mi humilde opinión el 99,99% eficaz, ya que todavía utiliza la función de bloqueo readline
, por lo que asume el proceso secundario es agradable y salidas de líneas completas.
Doy la bienvenida a la retroalimentación para mejorar la solución como soy todavía nuevo en Python.
Esta solución utiliza el módulo select
de "leer todos los datos disponibles" de una corriente IO. Esta función bloquea inicialmente hasta que se dispone de datos, pero entonces sólo lee los datos que están disponibles y no bloquea aún más.
Dado el hecho de que utiliza el módulo select
, esto sólo funciona en Unix.
El código es totalmente compatible PEP8.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
I también enfrentado el problema descrito por Jesse y resuelto mediante el uso de "seleccionar" como Bradley , Andy y otros lo hicieron, pero en un modo de bloqueo para evitar un bucle ocupado. Se utiliza un falso tubo como una entrada estándar falso. Los bloques seleccionados y esperar a que sea la entrada estándar o la tubería para estar listo. la entrada estándar cuando se pulsa una tecla desbloquea el selecto y el valor de la clave puede ser recuperada con capacidad de lectura (1). Cuando un subproceso diferente escribe a continuación el tubo de la tubería desbloquea el selecto y puede tomarse como una indicación de que la necesidad de la entrada estándar ha terminado. Aquí hay un código de referencia:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
Las cosas están mucho mejor en Python moderna.
Esto es un programa simple hijo, "hello.py":
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
Y un programa para interactuar con él:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
que imprime:
b'hello bob\n'
b'hello alice\n'
Tenga en cuenta que el patrón real, que también está en casi todas las respuestas anteriores, tanto aquí como en cuestiones conexas, es establecer descriptor de archivo de salida estándar del niño a no bloqueante y luego sondear en algún tipo de select bucle. En estos días, por supuesto, que bucle es proporcionado por asyncio.
Este es un módulo que soporta sin bloqueo lee y escribe en pitón fondo:
https://pypi.python.org/pypi/python-nonblock
Proporciona una función,
nonblock_read que leerá los datos de la corriente, si está disponible, de lo contrario devuelve una cadena vacía (o Ninguno si la corriente está cerrado en el otro lado y todos los datos posibles se ha leído)
También puede considerar el módulo de python-subprocess2,
https://pypi.python.org/pypi/python-subprocess2
que se suma al módulo de subproceso. Por lo tanto en el objeto de regresar de "subprocess.Popen" se añade un método adicional, runInBackground. Así se inicia un hilo y devuelve un objeto que será automáticamente poblada como enseres se escribe en stdout / stderr, sin bloquear el hilo principal.
Disfrute!