Pregunta

Estoy usando el módulo subprocess para iniciar un subproceso y conectarse a la secuencia de salida (stdout).Quiero ser capaz de ejecutar el bloqueo no lee en su stdout.Es allí una manera de hacer .readline sin bloqueo o para comprobar si hay datos en la secuencia antes de invoco .readline?Me gustaría que este para ser portátil o al menos trabajar en Windows y Linux.

aquí es cómo yo lo hago por ahora (Es el bloqueo en el .readline si no hay datos disponibles):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
¿Fue útil?

Solución

fcntl , select , asyncproc no ayudará en este caso.

Una forma confiable de leer una secuencia sin bloquear independientemente del sistema operativo es utilizar Queue.get_nowait() :

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

Otros consejos

A menudo he tenido un problema similar; programas de Python que escribo con frecuencia es necesario tener la capacidad de ejecutar algunas funciones primaria, mientras que al mismo tiempo aceptar la entrada del usuario desde la línea de comandos (entrada estándar). Basta con poner la funcionalidad de manejo de la entrada del usuario en otro hilo no resuelve el problema, ya que los bloques readline() y no tiene tiempo de espera. Si la funcionalidad principal está completo y ya no hay ninguna necesidad de esperar más entradas de usuario que normalmente quiero que mi programa para salir, pero no puede porque readline() se sigue bloqueando en el otro hilo a la espera de una línea. Una solución que he encontrado a este problema es hacer un archivo de entrada estándar sin bloqueo mediante el módulo fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

En mi opinión, esto es un poco más limpio que el uso de los módulos de selección o de señal para resolver este problema pero de nuevo, sólo funciona en UNIX ...

Python 3.4 presenta nuevo provisional API para asíncrono IO - módulo asyncio .

El enfoque es similar a de respuesta basado en twisted por @Bryan Ward, - definir un protocolo y sus métodos se llaman tan pronto como los datos están listos:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

"subproceso" en la documentación .

Hay una asyncio.create_subprocess_exec() interfaz de alto nivel que devuelve Process objetos que permite leer una línea asynchroniosly usando StreamReader.readline() corrutina (Con async / await Python 3.5+ sintaxis ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() realiza las siguientes tareas:

  • iniciar el subproceso, redirigir su salida estándar a un tubo
  • leer una línea del subproceso stdout asíncrona
  • matar subproceso
  • esperar a que salga

Cada paso podría estar limitada por segundo de tiempo de espera si es necesario.

asyncproc módulo. Por ejemplo:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

El módulo se encarga de todo el roscado según lo sugerido por S. Lott.

Esto se puede hacer muy fácilmente en Twisted . Dependiendo de su base de código existente, esto podría no ser tan fácil de usar, pero si usted está construyendo una aplicación torcida, entonces este tipo de cosas convertido casi trivial. Se crea una clase ProcessProtocol, y reemplaza el método outReceived(). Twisted (dependiendo del reactor utilizado) es por lo general sólo un bucle grande select() con devoluciones de llamada instalados para manejar los datos de diferentes descriptores de fichero (con frecuencia conectores de red). Por lo que el método outReceived() es simplemente instalando una devolución de llamada para el manejo de los datos procedentes de STDOUT. Un simple ejemplo que demuestra este comportamiento es como sigue:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

El documentación Twisted tiene una buena información sobre esto.

Si se construye toda su aplicación alrededor de trenzado, que hace que la comunicación asíncrona con otros procesos, locales o remotas, muy elegantes como este. Por otro lado, si el programa no se construye en la parte superior de trenzado, esto no es realmente va a ser de mucha ayuda. Espero que esto puede ser útil para otros lectores, aunque no es aplicable para su aplicación en particular.

Uso seleccionar y leer (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Para readline () - como:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

Una solución es hacer otro proceso para realizar su lectura del proceso, o hacer un hilo del proceso con un tiempo de espera.

Aquí está la versión roscada de una función de tiempo de espera:

http://code.activestate.com/recipes/473878/

Sin embargo, no es necesario leer la salida estándar, ya que viene en? Otra solución puede ser para volcar el resultado en un archivo y esperar a que finalice el proceso usando p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

exención de responsabilidad: esto sólo funciona para un tornado

Usted puede hacer esto mediante el establecimiento de la fd ser sin bloqueo y luego usar ioloop para registrar las devoluciones de llamada. He empaquetado esto en un huevo llamado tornado_subprocess y puede instalarlo a través de PyPI:

easy_install tornado_subprocess

Ahora usted puede hacer algo como esto:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

También se puede utilizar con un RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

Las soluciones existentes no me funciona (la información a continuación).Lo que finalmente se trabajó fue implementar readline usando read(1) (basado en esta respuesta).El último no cuadra:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

¿Por qué las soluciones existentes no funciona:

  1. Las soluciones que requieren readline (incluyendo la Cola de base) siempre en bloque.Es difícil (¿imposible?) para matar el subproceso que ejecuta readline.Sólo se mató cuando el proceso que lo creó acabados, pero no cuando la salida-proceso de producción es asesinado.
  2. Mezcla de bajo nivel fcntl con un alto nivel de readline las llamadas pueden no funcionar correctamente como anonnn ha señalado.
  3. El uso de select.poll() es limpio, pero no funciona en el Windows de acuerdo a python docs.
  4. El uso de librerías de terceros parece excesivo para esta tarea y agrega dependencias adicionales.

Esta versión de la no-bloqueo de lectura no requerir módulos especiales y trabajará fuera de la caja en la mayoría de las distribuciones de Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

agrego este problema para leer algunos stdout subprocess.Popen. Aquí está mi solución no lectura de bloqueo:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

Aquí está mi código, para la captura de cada salida del subproceso lo antes posible, incluyendo las líneas parciales. Bombea al mismo tiempo y stdout y stderr en orden casi correcta.

Probado y correctamente trabajado en Python 2.7 Linux y Windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

La adición de esta respuesta aquí, ya que proporciona capacidad de fijar tuberías no bloqueantes en Windows y Unix.

Todos los detalles ctypes son gracias a @ techtonik de respuesta .

Hay una versión ligeramente modificados para ser utilizados tanto en los sistemas Unix y Windows.

  • compatibles python3 (sólo cambios menores necesario) .
  • Incluye versión posix, y define excepción de usar para cualquiera.

De esta manera se puede utilizar la misma función y una excepción para el código de Unix y Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Para evitar la lectura de datos incompletos, terminé de escribir mi propio generador de readline (que devuelve la cadena de bytes para cada línea).

Es un generador para que pueda, por ejemplo ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

Tengo un problema del que pregunta original, pero no deseo invocar hilos. He mezclado solución de Jesse con una lectura directa () de la tubería, y mi propio buffer-manejador de línea se lee (sin embargo, mi sub-proceso - de ping - siempre escribió líneas completas

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

El observador es

def watch(f, *other):
print 'reading',f.read()
return True

Y el programa principal establece un ping y luego llama bucle de correo gobject.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Cualquier otro trabajo está unido a devoluciones de llamada en gobject.

El seleccione módulo ayuda a determinar dónde la siguiente entrada útil es.

Sin embargo, usted es casi siempre más feliz con hilos separados. Uno hace un bloqueo de lectura de la entrada estándar, otro hace dondequiera que se encuentre que no desea bloqueado.

¿Por qué molestarse hilo y cola? a diferencia de readline (), BufferedReader.read1 () suele bloquear la espera de \ r \ n, devuelve ASAP si hay cualquier salida entrando.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

En mi caso necesitaba un módulo de registro que capta la salida de las aplicaciones en segundo plano y lo multiplica (la adición de marcas de tiempo, colores, etc.).

Terminé con un subproceso de fondo que hace la E / S real. Siguiente código es sólo para plataformas POSIX. Me desnudé partes no esenciales.

Si alguien va a utilizar esta bestia de carreras largas consideran la gestión de descriptores abiertos. En mi caso no fue un gran problema.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

He creado una biblioteca basada en J. solución de F. Sebastián. Se puede utilizar.

https://github.com/cenkalti/what

El trabajo de la respuesta de J. F. Sebastián, y varias otras fuentes, He creado un simple gestor de subproceso. Proporciona la solicitud de lectura sin bloqueo, así como el funcionamiento de varios procesos en paralelo. No utiliza ninguna llamada específica del SO (que yo sepa) y por lo tanto debería funcionar en cualquier lugar.

Está disponible en PyPI, por lo que sólo pip install shelljob. Consulte el página de ejemplos y documentos completos.

EDIT: Esta aplicación aún bloques. responder lugar.

probé el superior respuesta, pero el riesgo y el mantenimiento del código de hilo adicional era preocupante.

Mirando a través del módulo io ( y se limita a 2,6), encontré BufferedReader. Esta es mi solución sin rosca, sin bloqueo.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

Hace poco me encontré con el mismo problema Tengo que leer una línea a la vez de la corriente (la cola se ejecuta en el subproceso) en el modo no bloqueante Quería evitar los problemas siguientes: no quemar la CPU, no lea corriente por un byte (como lo hizo readline), etc.

Aquí está mi aplicación https://gist.github.com/grubberr/5501e1a9760c3eab5e0a que no son compatibles con las ventanas (sondeo), no se ocupan de EOF, pero a mí me funciona bien

Este es un ejemplo para ejecutar comandos interactivo en el sub-proceso, y la salida estándar es interactiva mediante el uso de pseudo-terminal. Se puede hacer referencia a: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

Mi problema es un poco diferente, ya que quería recoger stdout y stderr de un proceso en ejecución, pero en última instancia la misma ya que quería hacer la salida en un widget como su generado.

No quería que recurrir a muchas de las soluciones propuestas usando una cola o hilos adicionales, ya que no deberían ser necesarios para llevar a cabo una tarea tan común como la ejecución de otro guión y la recogida de su salida.

Después de leer las soluciones propuestas y documentación de Python resolví mi problema con la aplicación a continuación. Sí sólo funciona para POSIX como yo estoy usando la llamada de función select.

Estoy de acuerdo que los documentos son confusas y la puesta en práctica es difícil para una tarea de secuencias de comandos comunes. Creo que las versiones anteriores de Python tienen diferentes valores predeterminados para Popen y diferentes explicaciones por lo que crearon mucha confusión. Esto parece funcionar bien tanto para Python 3.5.2 y 2.7.12.

La clave era establecer bufsize=1 para el búfer de línea y luego universal_newlines=True para procesar como un archivo de texto en lugar de un binario que parece ser el valor por defecto cuando se configura bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, depurar y VERBOSE son simplemente macros que imprimen la salida de la terminal.

Esta solución es mi humilde opinión el 99,99% eficaz, ya que todavía utiliza la función de bloqueo readline, por lo que asume el proceso secundario es agradable y salidas de líneas completas.

Doy la bienvenida a la retroalimentación para mejorar la solución como soy todavía nuevo en Python.

Esta solución utiliza el módulo select de "leer todos los datos disponibles" de una corriente IO. Esta función bloquea inicialmente hasta que se dispone de datos, pero entonces sólo lee los datos que están disponibles y no bloquea aún más.

Dado el hecho de que utiliza el módulo select, esto sólo funciona en Unix.

El código es totalmente compatible PEP8.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

I también enfrentado el problema descrito por Jesse y resuelto mediante el uso de "seleccionar" como Bradley , Andy y otros lo hicieron, pero en un modo de bloqueo para evitar un bucle ocupado. Se utiliza un falso tubo como una entrada estándar falso. Los bloques seleccionados y esperar a que sea la entrada estándar o la tubería para estar listo. la entrada estándar cuando se pulsa una tecla desbloquea el selecto y el valor de la clave puede ser recuperada con capacidad de lectura (1). Cuando un subproceso diferente escribe a continuación el tubo de la tubería desbloquea el selecto y puede tomarse como una indicación de que la necesidad de la entrada estándar ha terminado. Aquí hay un código de referencia:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

Las cosas están mucho mejor en Python moderna.

Esto es un programa simple hijo, "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

Y un programa para interactuar con él:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

que imprime:

b'hello bob\n'
b'hello alice\n'

Tenga en cuenta que el patrón real, que también está en casi todas las respuestas anteriores, tanto aquí como en cuestiones conexas, es establecer descriptor de archivo de salida estándar del niño a no bloqueante y luego sondear en algún tipo de select bucle. En estos días, por supuesto, que bucle es proporcionado por asyncio.

Este es un módulo que soporta sin bloqueo lee y escribe en pitón fondo:

https://pypi.python.org/pypi/python-nonblock

Proporciona una función,

nonblock_read que leerá los datos de la corriente, si está disponible, de lo contrario devuelve una cadena vacía (o Ninguno si la corriente está cerrado en el otro lado y todos los datos posibles se ha leído)

También puede considerar el módulo de python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

que se suma al módulo de subproceso. Por lo tanto en el objeto de regresar de "subprocess.Popen" se añade un método adicional, runInBackground. Así se inicia un hilo y devuelve un objeto que será automáticamente poblada como enseres se escribe en stdout / stderr, sin bloquear el hilo principal.

Disfrute!

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top