Domanda

Sto utilizzando il sottoprocesso modulo per avviare un processo secondario e collegare ad esso il flusso di output (stdout).Voglio essere in grado di eseguire non-blocking legge sul suo stdout.C'è un modo per fare .readline non-blocking o per controllare se ci sono dati sul flusso prima di richiamare .readline?Mi piacerebbe questo, per essere portatile, o almeno lavorare sotto Windows che sotto Linux.

ecco come fare per ora (È il blocco sul .readline se non ci sono dati disponibili):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
È stato utile?

Soluzione

fcntl , select , asyncproc non aiuterà in questo caso.

Un modo affidabile per leggere un flusso senza bloccare indipendentemente dal sistema operativo è usare Queue.get_nowait() :

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

Altri suggerimenti

Più volte ho avuto un problema simile; programmi Python Scrivo spesso necessario avere la capacità di eseguire alcune funzionalità primaria e contemporaneamente accettare input dell'utente dalla riga di comando (stdin). Semplicemente mettendo la funzionalità movimentazione input dell'utente in un altro filo non risolve il problema perché blocchi readline() e non ha timeout. Se la funzionalità principale è completo e non v'è più alcuna necessità di attendere ulteriori input dell'utente Io di solito voglio che il mio programma per uscire, ma non può perché readline() è ancora bloccando in altro thread in attesa di una linea. Una soluzione che ho trovato a questo problema è quello di rendere un file stdin non bloccante utilizzando il modulo fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

A mio parere questo è un po 'più pulita rispetto all'utilizzo dei moduli di selezione o di segnale per risolvere questo problema, ma poi di nuovo funziona solo su UNIX ...

Python 3.4 introduce la nuova provvisorio API per asincrona IO - modulo asyncio .

L'approccio è simile a risposta twisted-based @Bryan Ward - definire un protocollo ei suoi metodi sono chiamati non appena i dati sono pronti:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

"sottoprocesso" nella documentazione .

C'è un'interfaccia asyncio.create_subprocess_exec() di alto livello che restituisce Process oggetti che permette di leggere una riga asynchroniosly utilizzando StreamReader.readline() coroutine (Con async / await Python 3.5+ sintassi ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() esegue le seguenti attività:

  • avviare sottoprocesso, riorientare la sua stdout ad un tubo
  • leggere una linea da sottoprocesso stdout in modo asincrono
  • uccidere sottoprocesso
  • attendere che per uscire

Ogni passo potrebbe essere limitata da timeout secondi se necessario.

Prova il asyncproc modulo. Ad esempio:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Il modulo si prende cura di tutto il threading come suggerito da S. Lott.

È possibile farlo molto facilmente in ritorto . A seconda della vostra base di codice esistente, questo potrebbe non essere così facile da usare, ma se si sta costruendo un'applicazione contorto, quindi le cose come questa diventato quasi banale. Si crea una classe ProcessProtocol, e l'override del metodo outReceived(). Ritorto (a seconda del reattore utilizzato) è di solito solo un ciclo grande select() con richiamate installati per gestire i dati provenienti da diversi descrittori di file (spesso prese di rete). Quindi il metodo outReceived() è sufficiente installare un callback per la gestione di dati provenienti da STDOUT. Un semplice esempio che dimostra questo comportamento è la seguente:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Il documentazione ritorto ha alcune buone informazioni su questo.

Se si costruisce l'intera applicazione intorno Contorto, rende la comunicazione asincrona con altri processi, locali o remoti, davvero eleganti come questo. D'altra parte, se il programma non è costruito sulla base di Twisted, questo non è davvero essere che utile. Speriamo che questo può essere utile per gli altri lettori, anche se non è applicabile per una particolare applicazione.

Usa selezionare e leggere (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Per readline () - come:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

Una soluzione è quella di fare un altro processo per eseguire la lettura del processo, o fare un thread del processo con un timeout.

Ecco la versione filettata di una funzione di timeout:

http://code.activestate.com/recipes/473878/

Tuttavia, non è necessario leggere lo stdout come è in arrivo? Un'altra soluzione potrebbe essere quella di scaricare l'output in un file e attendere il termine del processo utilizzando p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

Esonero di responsabilità: questo funziona solo per i tornado

È possibile farlo impostando il fd di essere non bloccante e quindi utilizzare ioloop per registrare callback. Ho confezionato questo in un uovo chiamato tornado_subprocess e è possibile installarlo tramite PyPI:

easy_install tornado_subprocess

ora si può fare qualcosa di simile:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

è possibile anche usarlo con un RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

Le soluzioni esistenti non hanno funzionato per me (dettagli sotto). Ciò che alla fine ha lavorato è stato quello di implementare readline usando lettura (1) (sulla base di questa risposta ). Quest'ultimo non blocca:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Perché le soluzioni esistenti non hanno funzionato:

  1. Soluzioni che richiedono readline (compresi quelli a base di coda) sempre bloccare. E 'difficile (impossibile?) Per uccidere il thread che esegue readline. E 'solo viene ucciso quando il processo che lo ha creato finisce, ma non quando il processo di output che producono viene ucciso.
  2. Mixing fcntl a basso livello con chiamate readline di alto livello potrebbe non funzionare correttamente come anonnn ha sottolineato.
  3. Uso select.poll () è pulito, ma non funziona su Windows in base alla documentazione Python.
  4. Utilizzo di librerie di terze parti sembra eccessivo per questo compito e aggiunge ulteriori dipendenze.

Questa versione di non-blocking leggere non richiedere moduli speciali e lavorerà out-of-the-box sulla maggior parte delle distribuzioni Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

aggiungo questo problema di leggere alcuni subprocess.Popen stdout. Ecco la mia soluzione non bloccante lettura:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

Ecco il mio codice, utilizzato per la cattura di ogni uscita da sottoprocesso ASAP, comprese le linee parziali. Esso pompe allo stesso tempo e stdout e stderr per quasi corretto.

Testato e correttamente lavorato su Python 2.7 Linux e Windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

L'aggiunta di questa risposta qui in quanto fornisce la capacità di impostare i tubi non bloccanti su Windows e Unix.

Tutti i dettagli sono ctypes grazie a @ di techtonik risposta .

C'è una versione leggermente modificata per essere utilizzato sia su sistemi Unix e Windows.

  • python3 compatibile (solo piccola modifica necessaria) .
  • include la versione POSIX, e definisce eccezioni da utilizzare per entrambi.

In questo modo è possibile utilizzare la stessa funzione e l'eccezione per il codice Unix e Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Per evitare la lettura di dati incompleti, ho finito per scrivere il mio generatore di readline (che restituisce la stringa di byte per ogni linea).

E 'un generatore in modo da poter, ad esempio ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

Non ho il problema del interrogante originale, ma non ho voluto richiamare le discussioni. Ho mescolato la soluzione di Jesse con una lettura diretta () dal tubo, e il mio proprio buffer-handler per riga si legge (tuttavia, il mio sub-processo - ping - sempre scritto linee complete

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

L'osservatore è

def watch(f, *other):
print 'reading',f.read()
return True

E il programma principale imposta un ping e quindi chiama loop di posta gobject.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Qualsiasi altra opera è collegata al callback in gobject.

Il selezionare modulo consente di determinare dove il prossimo ingresso utile è.

Tuttavia, si è quasi sempre più felice con thread separati. Uno non un blocco lettura stdin, un altro fa dovunque sia che non si desidera bloccato.

Perché preoccuparsi filo e la fila? a differenza di readline (), BufferedReader.read1 () wont blocco in attesa di \ r \ n, restituisce ASAP se c'è qualche uscita in arrivo.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

Nel mio caso avevo bisogno di un modulo di registrazione che cattura l'uscita dalle applicazioni in background e aumenta esso (l'aggiunta di time-stamp, colori, ecc.).

Ho finito con un thread in background che fa l'attuale di I / O. A seguito di codice è solo per le piattaforme POSIX. Ho messo a nudo le parti non essenziali.

Se qualcuno sta per utilizzare questa bestia per long run considerano la gestione di descrittori aperti. Nel mio caso non è stato un grosso problema.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

Ho creato una libreria basata su J. soluzione di F. Sebastian. Si può usare.

https://github.com/cenkalti/what

Lavorare da risposta di J.F. Sebastian, e diverse altre fonti, ho messo insieme un semplice gestore di sottoprocesso. Esso fornisce la lettura non bloccante richiesta, così come l'esecuzione di più processi in parallelo. Non usa alcuna chiamata specifico sistema operativo (che io sappia) e quindi dovrebbe lavorare ovunque.

E 'disponibile da PyPI, quindi basta pip install shelljob. Fare riferimento alla pagina per esempi e documentazione completa.

EDIT: Questa implementazione ancora blocchi. rispondere .

ho provato il risposta superiore , ma il rischio e la manutenzione del codice thread aggiuntivo era preoccupante.

Guardando attraverso il io modulo ( e di essere limitata a 2,6), ho trovato BufferedReader. Questo è il mio Threadless, una soluzione non-blocking.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

Recentemente ho incappato nello stesso problema Ho bisogno di leggere una riga alla volta dal flusso (coda eseguito in sottoprocesso) in modo non bloccante Volevo evitare successivi problemi: non bruciare cpu, non leggono flusso da un byte (come readline ha fatto), etc

Ecco la mia implementazione https://gist.github.com/grubberr/5501e1a9760c3eab5e0a che non supportano le finestre (sondaggio), non gestiscono EOF, ma per me funziona bene

Questo è un esempio di eseguire il comando interattiva in sottoprocesso, e stdout è interattiva utilizzando pseudo terminale. Si può fare riferimento a: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

Il mio problema è un po ' diversa, come ho voluto raccogliere sia stdout e stderr da un processo in esecuzione, ma in definitiva la stessa da quando ho voluto rendere l'output in un widget come la sua generato.

Non volevo ricorrere a molte delle proposte di soluzioni alternative utilizzando Code o ulteriori Thread come questi non dovrebbe essere necessario eseguire un compito comune, come l'esecuzione di un altro script e raccogliendo la sua uscita.

Dopo aver letto la proposta di soluzioni e python-docs ho risolto il mio problema con l'implementazione di seguito.Sì, ma funziona solo per POSIX come sto usando il select chiamata di funzione.

Sono d'accordo che i documenti sono confuso e l'implementazione è a disagio per il comune di scripting attività.Credo che le vecchie versioni di python hanno diversi valori predefiniti per Popen e diverse spiegazioni così che ha creato un sacco di confusione.Questo sembra funzionare bene sia per Python 2.7.12 e 3.5.2.

La chiave è stato quello di impostare bufsize=1 per la linea di buffering e poi universal_newlines=True a processo come un file di testo invece di un binario che sembra diventare l'impostazione predefinita quando bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

Di ERRORE, eseguire il DEBUG e DETTAGLIATO, sono semplicemente le macro stampare l'output del terminale.

Questa soluzione è IMHO il 99,99% di efficacia in quanto utilizza ancora il blocco readline funzione, quindi si suppone che il processo di sub è bello e uscite di linee complete.

Accolgo con favore il feedback per migliorare la soluzione, io sono ancora nuovo a Python.

Questa soluzione utilizza il modulo select di "leggere tutti i dati disponibili" da un flusso di IO. Questa funzione blocca inizialmente fino sono disponibili i dati, ma poi legge solo i dati che è disponibile e non bloccare ulteriormente.

Dato il fatto che utilizza il modulo select, questo funziona solo su Unix.

Il codice è completamente PEP8-compliant.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

Ho anche affrontato il problema descritto da Jesse e risolto utilizzando "seleziona" il Bradley , Andy altri hanno fatto, ma in una modalità di blocco per evitare un ciclo occupato. Esso utilizza un tubo fittizio come stdin falso. I blocchi di selezione e attendere che sia stdin o il tubo per essere pronti. stdin quando viene premuto un tasto sblocca la selezione e il valore di chiave può essere recuperato con lettura (1). Quando un thread diverso scrive al tubo poi il tubo sblocca il prescelto e può essere considerato come un'indicazione che la necessità di stdin è finita. Ecco il codice di riferimento:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

Le cose vanno molto meglio in Python moderna.

Ecco un semplice programma di bambino "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

E un programma di interagire con esso:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

che stampa:

b'hello bob\n'
b'hello alice\n'

Si noti che l'andamento reale, che è anche da quasi tutte le risposte precedenti, sia qui che in questioni correlate, è quello di impostare descrittore di file stdout del bambino a non bloccante e poi sondaggio è in una sorta di selezionare loop. In questi giorni, ovviamente, quel ciclo è fornito da asyncio.

Qui è un modulo che supporta non bloccante legge e sfondo scrive in Python:

https://pypi.python.org/pypi/python-nonblock

fornisce una funzione,

nonblock_read che leggere dati dal flusso, se disponibile, altrimenti restituisce una stringa vuota (o None se il flusso è chiuso sul lato opposto e tutti i dati possibili sono stati letti)

Si può anche prendere in considerazione il modulo python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

che aggiunge al modulo sottoprocesso. Così via l'oggetto restituito da "subprocess.Popen" viene aggiunto un ulteriore metodo, runInBackground. Questo avvia un thread e restituisce un oggetto che verrà automaticamente popolato come roba viene scritto output / error, senza bloccare il thread principale.

Enjoy!

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top