Sem bloqueio de leitura em um subprocess.PIPE em python
-
22-08-2019 - |
Pergunta
Eu estou usando o subprocess módulo para iniciar um subprocesso e conectar a ele de saída fluxo (saída padrão). Eu quero ser capaz de executar non-blocking lê em seu stdout. Existe uma maneira de fazer .readline sem bloqueio ou para verificar se há dados sobre o fluxo antes invoco .readline
? Eu gostaria que isso seja portátil ou, pelo menos, trabalho em Windows e Linux.
aqui é como eu faço isso por agora (ele está bloqueando no .readline
se nenhum dado é avaible):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Solução
fcntl
, select
, asyncproc
não vai ajudar neste caso.
A maneira confiável para ler um fluxo sem bloquear independentemente do sistema operacional é usar Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
Outras dicas
Muitas vezes tive um problema semelhante; programas em Python que eu escrevo com freqüência precisa ter a capacidade de executar algumas funcionalidades primário enquanto aceitando simultaneamente a entrada do usuário a partir da linha de comando (stdin). Basta colocar a funcionalidade manipulação de entrada do usuário em outro segmento não resolve o problema, pois blocos readline()
e não tem limite de tempo. Se a funcionalidade principal está completo e já não há qualquer necessidade de esperar por mais entrada do usuário que normalmente querem o meu programa para sair, mas não pode porque readline()
ainda está bloqueando na outra espera thread para uma linha. A solução que eu encontrei para este problema é fazer com que stdin um arquivo sem bloqueio usando o módulo fcntl:
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
Na minha opinião este é um pouco mais limpo do que usar os módulos selecionados ou de sinal para resolver este problema mas, novamente ele só funciona em UNIX ...
Python 3.4 introduz novo provisória API para IO assíncrono - asyncio
módulo .
A abordagem é semelhante à resposta twisted
baseada em pelo @Bryan Ward - definir um protocolo e seus métodos são chamados assim que os dados está pronto:
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
Há uma asyncio.create_subprocess_exec()
interface de alto nível que os retornos Process
objetos , que permite ler uma linha asynchroniosly usando StreamReader.readline()
coroutine
(Com async
/ await
Python 3.5+ sintaxe ):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
executa readline_and_kill()
as seguintes tarefas:
- começar subprocess, redirecionar sua stdout para um pipe
- ler uma linha de subprocesso stdout de forma assíncrona
- matar subprocess
- esperar por ele para sair
Cada passo poderia ser limitada por segundos de tempo de espera, se necessário.
Tente o asyncproc módulo. Por exemplo:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
O módulo cuida de toda a rosca como sugerido por S. Lott.
Você pode fazer isso muito facilmente em torcida . Dependendo de sua base de código existente, isso pode não ser tão fácil de usar, mas se você está construindo uma aplicação de torcida, então coisas como esta tornam-se quase trivial. Você cria uma classe ProcessProtocol
, e substituir o método outReceived()
. Trançado (dependendo do reator utilizado) é geralmente apenas um loop select()
grande com retornos de chamada instalados para identificador dados de diferentes descritores de arquivos (muitas vezes tomadas de rede). Assim, o método outReceived()
é simplesmente instalando uma chamada de retorno para o tratamento de dados provenientes de STDOUT
. Um exemplo simples que demonstra este comportamento é o seguinte:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
O torcida documentação tem algumas boas informações sobre este assunto.
Se você construir seu aplicativo inteiro em torno Torcido, que torna a comunicação assíncrona com outros processos, local ou remoto, muito elegantes como esta. Por outro lado, se o seu programa não é construído em cima da torcida, isso não é realmente vai ser tão útil. Esperemos que este pode ser útil para outros leitores, mesmo que não é aplicável para a sua aplicação em particular.
Use selecionar e ler (1).
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
Para readline () - como:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
Uma solução é fazer um outro processo para executar sua leitura do processo, ou fazer um segmento do processo com um tempo limite.
Aqui está o modelo com rosca de uma função timeout:
http://code.activestate.com/recipes/473878/
No entanto, você precisa ler o stdout como ele está entrando? Outra solução pode ser a despejar a saída para um arquivo e aguarde o processo terminar usando p.wait () .
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
Disclaimer: isso funciona somente para tornado
Você pode fazer isso definindo o fd ser não bloqueante e depois usar ioloop a registrar retornos de chamada. Tenho embalados isso em um ovo chamado tornado_subprocess e você pode instalá-lo via PyPI:
easy_install tornado_subprocess
Agora você pode fazer algo como isto:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
Você também pode usá-lo com um RequestHandler
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
As soluções existentes não funcionou para mim (detalhes abaixo). O que finalmente funcionou foi implementar readline usando read (1) (com base na esta resposta ). se este último não bloquear:
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
Por que as soluções existentes não funcionou:
- Soluções que requerem readline (incluindo os baseados Fila) sempre bloquear. É difícil (impossível?) Para matar a thread que executa readline. Ele só morre quando o processo que o criou termina, mas não quando o processo de produção de saída é morto.
- Mistura fcntl de baixo nível com chamadas readline de alto nível podem não funcionar corretamente como anonnn assinalou.
- Usando select.poll () é limpo, mas não funciona no Windows de acordo com docs python.
- Usando bibliotecas de terceiros parece um exagero para esta tarefa e adiciona dependências adicionais.
Esta versão do leitura sem bloqueio não requer módulos especiais e irá funcionar out-of-the-box na maioria das distribuições Linux.
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
eu adicionar esse problema para ler alguns stdout subprocess.Popen. Aqui está a minha solução não bloqueio leitura:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
Aqui está o meu código, usado para capturar cada saída do subprocesso o mais rápido possível, incluindo linhas parciais. Ele bombeia ao mesmo tempo e stdout e stderr de forma quase correta.
Testado e corretamente trabalhou em Python 2.7 Linux e Windows.
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
Adicionando esta resposta aqui, uma vez que fornece capacidade de tubos set sem bloqueio no Windows e Unix.
Todos os detalhes ctypes
são graças a de @ techtonik resposta .
Existe uma versão ligeiramente modificada para ser usado tanto em sistemas Unix e Windows.
- Python3 compatível (somente pequena alteração necessária) .
- Inclui versão POSIX, e define exceção a utilização para qualquer um.
Desta forma, você pode usar a mesma função e exceção para o código Unix e Windows.
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
Para evitar a leitura de dados incompletos, acabei escrevendo meu próprio gerador readline (que retorna a seqüência de byte para cada linha).
O seu gerador de um modo que você pode, por exemplo ...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
Eu tenho problema da pergunta original, mas não desejo invocar threads. Eu misturei solução de Jesse com uma leitura direta () do tubo, e meu próprio tampão-manipulador para linha lê (no entanto, o meu sub-processo - de ping - sempre escreveu linhas completas O observador é E as principais conjuntos do programa até um ping e depois chama ciclo de correio gobject. Qualquer outro trabalho está ligado a retornos de chamada em gobject. def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
def watch(f, *other):
print 'reading',f.read()
return True
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
O selecionar módulo ajuda a determinar onde o contributo útil seguinte é.
No entanto, você está quase sempre mais feliz com segmentos separados. Um faz um bloqueio ler a stdin, outro faz onde quer que seja que você não quer ser bloqueado.
Por incomodando fio & fila? readline ao contrário (), BufferedReader.read1 () bloco não vai esperar por \ r \ n, ele retorna o mais rápido possível se houver qualquer saída entrando.
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
No meu caso eu precisava de um módulo de registo que as capturas a saída das aplicações do fundo e aumenta-lo (a adição de tempo-selos, cores, etc.).
Eu acabei com uma discussão de fundo que faz o real I / O. Seguinte código é apenas para plataformas POSIX. Tirei partes não essenciais.
Se alguém está indo para usar esta besta para corridas longas considerar gerir descritores abertos. No meu caso, não foi um grande problema.
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
Eu criei uma biblioteca com base em J. solução de F. Sebastian. Você pode usá-lo.
Trabalho de resposta de J.F. Sebastian, e várias outras fontes, eu coloquei um gerente subprocess simples. Ele fornece a leitura do pedido sem bloqueio, bem como executando vários processos em paralelo. Ele não usa qualquer chamada específicos do OS (que eu saiba) e assim deve funcionar em qualquer lugar.
Está disponível a partir pypi, então apenas pip install shelljob
. Consulte o projeto página para exemplos e documentação completa.
EDIT: Esta implementação ainda blocos. responder .
Eu tentei o topo resposta , mas o risco e manutenção de código de segmento adicional era preocupante.
Olhando através do io módulo ( e estar limitado a 2,6), eu encontrei BufferedReader. Esta é a minha rosca, solução non-blocking.
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
Recentemente, tropeçou no mesmo problema Eu preciso ler uma linha de tempo do fluxo (cauda executado em subprocess) no modo de não-bloqueio Eu queria evitar próximos problemas: não queimar cpu, não leia fluxo por um byte (como readline fez), etc
Aqui está minha implementação https://gist.github.com/grubberr/5501e1a9760c3eab5e0a que não suportam janelas (pesquisa), não lidar com EOF, mas funciona para mim bem
Este é um exemplo para executar o comando interactivo no subprocesso, e a saída padrão é interactivo usando pseudo-terminal. Você pode se referir a: https://stackoverflow.com/a/43012138/3555925
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
O meu problema é um pouco diferente como eu queria recolher ambos stdout e stderr de um processo em execução, mas no final o mesmo desde que eu queria processar a saída em um widget como seu gerado.
Eu não queria recorrer a muitas das soluções propostas utilizando Filas ou Tópicos adicionais como eles não devem ser necessário realizar uma tarefa tão comum como a execução de outro script e coleta de sua saída.
Depois de ler as soluções propostas e docs python Eu resolvi meu problema com a implementação abaixo. Sim, só funciona para POSIX como eu estou usando a chamada de função select
.
Eu concordo que os documentos são confusas e a implementação é difícil para tal tarefa scripting comum. Eu acredito que as versões mais antigas do python ter diferentes padrões para Popen
e diferentes explicações para que criou um monte de confusão. Isso parece funcionar bem tanto para Python 2.7.12 e 3.5.2.
A chave era bufsize=1
conjunto para a linha de buffer e, em seguida, universal_newlines=True
ao processo como um arquivo de texto em vez de um binário que parece tornar-se o padrão ao definir bufsize=1
.
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERRO, depurar e VERBOSE são simplesmente macros que a saída de impressão para o terminal.
Esta solução é IMHO 99,99% eficaz, pois ainda usa a função readline
bloqueio, então assumimos o processo de sub é bom e produz linhas completas.
Congratulo-me com o feedback para melhorar a solução como eu ainda sou novo para Python.
Esta solução utiliza o módulo select
de "ler todos os dados disponíveis" a partir de um fluxo de IO. Esta função bloqueia inicialmente até há dados disponíveis, mas, em seguida, lê apenas os dados que estão disponíveis e não bloqueia mais.
Dado o fato de que ele usa o módulo select
, este só funciona em Unix.
O código é totalmente PEP8-compliant.
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
Eu também enfrentou o problema descrito por Jesse e resolvido-lo usando "select", como Bradley , Andy outros fizeram, mas em um modo de bloqueio para evitar um loop ocupado. Ele usa uma tubulação de manequim como um falso stdin. Os blocos selecionados e esperar por tanto stdin ou o tubo para ficar pronto. Quando uma tecla é pressionada stdin desbloqueia a selecionar e o valor da chave pode ser recuperada com a leitura (1). Quando um segmento diferente escreve para o tubo, em seguida, o tubo desbloqueia o seleto e pode ser tomado como uma indicação de que a necessidade de stdin é longo. Aqui está um código de referência:
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
As coisas estão muito melhor em Python moderna.
Aqui está um programa simples criança, "hello.py":
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
E um programa para interagir com ele:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
que imprime:
b'hello bob\n'
b'hello alice\n'
Note que o padrão real, que também está por quase todas as respostas anteriores, tanto aqui como em questões relacionadas, é definir descritor de arquivo stdout da criança para não-bloqueio e, em seguida, poll-lo em algum tipo de select loop. Estes dias, é claro, esse ciclo é fornecido pelo asyncio.
Aqui é um módulo que suporta non-blocking lê e escreve fundo em python:
https://pypi.python.org/pypi/python-nonblock
Fornece uma função,
nonblock_read que vai ler os dados a partir da corrente, se disponível, caso contrário retorna uma cadeia vazia (ou nenhum, se a corrente é fechado no outro lado e de todos os dados foi possível leitura)
Você também pode considerar o módulo python-subprocess2,
https://pypi.python.org/pypi/python-subprocess2
o que aumenta o módulo de subprocesso. Assim, no objeto retornado de "subprocess.Popen" é adicionado um método adicional, runInBackground. Isso inicia um thread e retorna um objeto que será automaticamente preenchido como o material é escrito para stdout / stderr, sem bloquear o thread principal.
Aproveite!