Pergunta

Eu estou usando o subprocess módulo para iniciar um subprocesso e conectar a ele de saída fluxo (saída padrão). Eu quero ser capaz de executar non-blocking lê em seu stdout. Existe uma maneira de fazer .readline sem bloqueio ou para verificar se há dados sobre o fluxo antes invoco .readline? Eu gostaria que isso seja portátil ou, pelo menos, trabalho em Windows e Linux.

aqui é como eu faço isso por agora (ele está bloqueando no .readline se nenhum dado é avaible):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Foi útil?

Solução

fcntl , select , asyncproc não vai ajudar neste caso.

A maneira confiável para ler um fluxo sem bloquear independentemente do sistema operacional é usar Queue.get_nowait() :

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

Outras dicas

Muitas vezes tive um problema semelhante; programas em Python que eu escrevo com freqüência precisa ter a capacidade de executar algumas funcionalidades primário enquanto aceitando simultaneamente a entrada do usuário a partir da linha de comando (stdin). Basta colocar a funcionalidade manipulação de entrada do usuário em outro segmento não resolve o problema, pois blocos readline() e não tem limite de tempo. Se a funcionalidade principal está completo e já não há qualquer necessidade de esperar por mais entrada do usuário que normalmente querem o meu programa para sair, mas não pode porque readline() ainda está bloqueando na outra espera thread para uma linha. A solução que eu encontrei para este problema é fazer com que stdin um arquivo sem bloqueio usando o módulo fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

Na minha opinião este é um pouco mais limpo do que usar os módulos selecionados ou de sinal para resolver este problema mas, novamente ele só funciona em UNIX ...

Python 3.4 introduz novo provisória API para IO assíncrono - asyncio módulo .

A abordagem é semelhante à resposta twisted baseada em pelo @Bryan Ward - definir um protocolo e seus métodos são chamados assim que os dados está pronto:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

"Subprocess" nos docs .

Há uma asyncio.create_subprocess_exec() interface de alto nível que os retornos Process objetos , que permite ler uma linha asynchroniosly usando StreamReader.readline() coroutine (Com async / await Python 3.5+ sintaxe ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

executa readline_and_kill() as seguintes tarefas:

  • começar subprocess, redirecionar sua stdout para um pipe
  • ler uma linha de subprocesso stdout de forma assíncrona
  • matar subprocess
  • esperar por ele para sair

Cada passo poderia ser limitada por segundos de tempo de espera, se necessário.

Tente o asyncproc módulo. Por exemplo:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

O módulo cuida de toda a rosca como sugerido por S. Lott.

Você pode fazer isso muito facilmente em torcida . Dependendo de sua base de código existente, isso pode não ser tão fácil de usar, mas se você está construindo uma aplicação de torcida, então coisas como esta tornam-se quase trivial. Você cria uma classe ProcessProtocol, e substituir o método outReceived(). Trançado (dependendo do reator utilizado) é geralmente apenas um loop select() grande com retornos de chamada instalados para identificador dados de diferentes descritores de arquivos (muitas vezes tomadas de rede). Assim, o método outReceived() é simplesmente instalando uma chamada de retorno para o tratamento de dados provenientes de STDOUT. Um exemplo simples que demonstra este comportamento é o seguinte:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

O torcida documentação tem algumas boas informações sobre este assunto.

Se você construir seu aplicativo inteiro em torno Torcido, que torna a comunicação assíncrona com outros processos, local ou remoto, muito elegantes como esta. Por outro lado, se o seu programa não é construído em cima da torcida, isso não é realmente vai ser tão útil. Esperemos que este pode ser útil para outros leitores, mesmo que não é aplicável para a sua aplicação em particular.

Use selecionar e ler (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Para readline () - como:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

Uma solução é fazer um outro processo para executar sua leitura do processo, ou fazer um segmento do processo com um tempo limite.

Aqui está o modelo com rosca de uma função timeout:

http://code.activestate.com/recipes/473878/

No entanto, você precisa ler o stdout como ele está entrando? Outra solução pode ser a despejar a saída para um arquivo e aguarde o processo terminar usando p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

Disclaimer: isso funciona somente para tornado

Você pode fazer isso definindo o fd ser não bloqueante e depois usar ioloop a registrar retornos de chamada. Tenho embalados isso em um ovo chamado tornado_subprocess e você pode instalá-lo via PyPI:

easy_install tornado_subprocess

Agora você pode fazer algo como isto:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

Você também pode usá-lo com um RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

As soluções existentes não funcionou para mim (detalhes abaixo). O que finalmente funcionou foi implementar readline usando read (1) (com base na esta resposta ). se este último não bloquear:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Por que as soluções existentes não funcionou:

  1. Soluções que requerem readline (incluindo os baseados Fila) sempre bloquear. É difícil (impossível?) Para matar a thread que executa readline. Ele só morre quando o processo que o criou termina, mas não quando o processo de produção de saída é morto.
  2. Mistura fcntl de baixo nível com chamadas readline de alto nível podem não funcionar corretamente como anonnn assinalou.
  3. Usando select.poll () é limpo, mas não funciona no Windows de acordo com docs python.
  4. Usando bibliotecas de terceiros parece um exagero para esta tarefa e adiciona dependências adicionais.

Esta versão do leitura sem bloqueio não requer módulos especiais e irá funcionar out-of-the-box na maioria das distribuições Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

eu adicionar esse problema para ler alguns stdout subprocess.Popen. Aqui está a minha solução não bloqueio leitura:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

Aqui está o meu código, usado para capturar cada saída do subprocesso o mais rápido possível, incluindo linhas parciais. Ele bombeia ao mesmo tempo e stdout e stderr de forma quase correta.

Testado e corretamente trabalhou em Python 2.7 Linux e Windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

Adicionando esta resposta aqui, uma vez que fornece capacidade de tubos set sem bloqueio no Windows e Unix.

Todos os detalhes ctypes são graças a de @ techtonik resposta .

Existe uma versão ligeiramente modificada para ser usado tanto em sistemas Unix e Windows.

  • Python3 compatível (somente pequena alteração necessária) .
  • Inclui versão POSIX, e define exceção a utilização para qualquer um.

Desta forma, você pode usar a mesma função e exceção para o código Unix e Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Para evitar a leitura de dados incompletos, acabei escrevendo meu próprio gerador readline (que retorna a seqüência de byte para cada linha).

O seu gerador de um modo que você pode, por exemplo ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

Eu tenho problema da pergunta original, mas não desejo invocar threads. Eu misturei solução de Jesse com uma leitura direta () do tubo, e meu próprio tampão-manipulador para linha lê (no entanto, o meu sub-processo - de ping - sempre escreveu linhas completas

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

O observador é

def watch(f, *other):
print 'reading',f.read()
return True

E as principais conjuntos do programa até um ping e depois chama ciclo de correio gobject.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Qualquer outro trabalho está ligado a retornos de chamada em gobject.

O selecionar módulo ajuda a determinar onde o contributo útil seguinte é.

No entanto, você está quase sempre mais feliz com segmentos separados. Um faz um bloqueio ler a stdin, outro faz onde quer que seja que você não quer ser bloqueado.

Por incomodando fio & fila? readline ao contrário (), BufferedReader.read1 () bloco não vai esperar por \ r \ n, ele retorna o mais rápido possível se houver qualquer saída entrando.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

No meu caso eu precisava de um módulo de registo que as capturas a saída das aplicações do fundo e aumenta-lo (a adição de tempo-selos, cores, etc.).

Eu acabei com uma discussão de fundo que faz o real I / O. Seguinte código é apenas para plataformas POSIX. Tirei partes não essenciais.

Se alguém está indo para usar esta besta para corridas longas considerar gerir descritores abertos. No meu caso, não foi um grande problema.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

Eu criei uma biblioteca com base em J. solução de F. Sebastian. Você pode usá-lo.

https://github.com/cenkalti/what

Trabalho de resposta de J.F. Sebastian, e várias outras fontes, eu coloquei um gerente subprocess simples. Ele fornece a leitura do pedido sem bloqueio, bem como executando vários processos em paralelo. Ele não usa qualquer chamada específicos do OS (que eu saiba) e assim deve funcionar em qualquer lugar.

Está disponível a partir pypi, então apenas pip install shelljob. Consulte o projeto página para exemplos e documentação completa.

EDIT: Esta implementação ainda blocos. responder .

Eu tentei o topo resposta , mas o risco e manutenção de código de segmento adicional era preocupante.

Olhando através do io módulo ( e estar limitado a 2,6), eu encontrei BufferedReader. Esta é a minha rosca, solução non-blocking.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

Recentemente, tropeçou no mesmo problema Eu preciso ler uma linha de tempo do fluxo (cauda executado em subprocess) no modo de não-bloqueio Eu queria evitar próximos problemas: não queimar cpu, não leia fluxo por um byte (como readline fez), etc

Aqui está minha implementação https://gist.github.com/grubberr/5501e1a9760c3eab5e0a que não suportam janelas (pesquisa), não lidar com EOF, mas funciona para mim bem

Este é um exemplo para executar o comando interactivo no subprocesso, e a saída padrão é interactivo usando pseudo-terminal. Você pode se referir a: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

O meu problema é um pouco diferente como eu queria recolher ambos stdout e stderr de um processo em execução, mas no final o mesmo desde que eu queria processar a saída em um widget como seu gerado.

Eu não queria recorrer a muitas das soluções propostas utilizando Filas ou Tópicos adicionais como eles não devem ser necessário realizar uma tarefa tão comum como a execução de outro script e coleta de sua saída.

Depois de ler as soluções propostas e docs python Eu resolvi meu problema com a implementação abaixo. Sim, só funciona para POSIX como eu estou usando a chamada de função select.

Eu concordo que os documentos são confusas e a implementação é difícil para tal tarefa scripting comum. Eu acredito que as versões mais antigas do python ter diferentes padrões para Popen e diferentes explicações para que criou um monte de confusão. Isso parece funcionar bem tanto para Python 2.7.12 e 3.5.2.

A chave era bufsize=1 conjunto para a linha de buffer e, em seguida, universal_newlines=True ao processo como um arquivo de texto em vez de um binário que parece tornar-se o padrão ao definir bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERRO, depurar e VERBOSE são simplesmente macros que a saída de impressão para o terminal.

Esta solução é IMHO 99,99% eficaz, pois ainda usa a função readline bloqueio, então assumimos o processo de sub é bom e produz linhas completas.

Congratulo-me com o feedback para melhorar a solução como eu ainda sou novo para Python.

Esta solução utiliza o módulo select de "ler todos os dados disponíveis" a partir de um fluxo de IO. Esta função bloqueia inicialmente até há dados disponíveis, mas, em seguida, lê apenas os dados que estão disponíveis e não bloqueia mais.

Dado o fato de que ele usa o módulo select, este só funciona em Unix.

O código é totalmente PEP8-compliant.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

Eu também enfrentou o problema descrito por Jesse e resolvido-lo usando "select", como Bradley , Andy outros fizeram, mas em um modo de bloqueio para evitar um loop ocupado. Ele usa uma tubulação de manequim como um falso stdin. Os blocos selecionados e esperar por tanto stdin ou o tubo para ficar pronto. Quando uma tecla é pressionada stdin desbloqueia a selecionar e o valor da chave pode ser recuperada com a leitura (1). Quando um segmento diferente escreve para o tubo, em seguida, o tubo desbloqueia o seleto e pode ser tomado como uma indicação de que a necessidade de stdin é longo. Aqui está um código de referência:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

As coisas estão muito melhor em Python moderna.

Aqui está um programa simples criança, "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

E um programa para interagir com ele:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

que imprime:

b'hello bob\n'
b'hello alice\n'

Note que o padrão real, que também está por quase todas as respostas anteriores, tanto aqui como em questões relacionadas, é definir descritor de arquivo stdout da criança para não-bloqueio e, em seguida, poll-lo em algum tipo de select loop. Estes dias, é claro, esse ciclo é fornecido pelo asyncio.

Aqui é um módulo que suporta non-blocking lê e escreve fundo em python:

https://pypi.python.org/pypi/python-nonblock

Fornece uma função,

nonblock_read que vai ler os dados a partir da corrente, se disponível, caso contrário retorna uma cadeia vazia (ou nenhum, se a corrente é fechado no outro lado e de todos os dados foi possível leitura)

Você também pode considerar o módulo python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

o que aumenta o módulo de subprocesso. Assim, no objeto retornado de "subprocess.Popen" é adicionado um método adicional, runInBackground. Isso inicia um thread e retorna um objeto que será automaticamente preenchido como o material é escrito para stdout / stderr, sem bloquear o thread principal.

Aproveite!

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top