Question

J'utilise le pour le module subprocess lancer un sous-processus et se connecter à sa sortie courant (stdout). Je veux être en mesure d'exécuter non-blocage se lit sur son stdout. Y at-il un moyen de faire .readline non-blocage ou pour vérifier s'il y a des données sur le flux avant que je .readline invoque? Je voudrais que ce soit portable ou tout au moins fonctionner sous Windows et Linux.

Voici comment je le fais pour l'instant (il bloque sur le .readline si aucune donnée est avaible):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
Était-ce utile?

La solution

fcntl , select , asyncproc ne sera pas utile dans ce cas.

Un moyen fiable de lire un flux sans bloquer indépendamment du système d'exploitation est d'utiliser Queue.get_nowait() :

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

Autres conseils

Je l'ai souvent eu un problème similaire; programmes Python que j'écris souvent besoin d'avoir la capacité d'exécuter certaines fonctionnalités primaire tout en acceptant simultanément l'entrée d'utilisateur à partir de la ligne de commande (stdin). Il suffit de mettre la fonctionnalité utilisateur gestion de l'entrée dans un autre thread ne résout pas le problème, car les blocs de readline() et n'a pas de délai. Si la fonctionnalité principale est terminée et il n'y a plus besoin d'attendre d'autres entrées utilisateur Je veux généralement mon programme pour sortir, mais il ne peut pas parce que readline() bloque toujours dans l'autre thread en attente d'une ligne. Une solution que j'ai trouvé à ce problème est de faire stdin un fichier non bloquant à l'aide du module fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

est à mon avis un peu plus propre que d'utiliser les modules de sélection ou de signal pour résoudre ce problème, mais là encore il ne fonctionne que sous UNIX ...

Python 3.4 introduit une nouvelle API provisoire pour IO asynchrone - module asyncio .

L'approche est similaire à réponse basée twisted par @Bryan Ward - définir un protocole et ses méthodes sont appelées dès que les données sont prêtes:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Voir "Subprocess" dans la documentation .

Il y a un asyncio.create_subprocess_exec() d'interface de haut niveau qui renvoie des objets Process qui permet de lire une ligne asynchroniosly en utilisant StreamReader.readline() coroutine (Avec async / await Python 3.5+ syntaxe ):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() effectue les tâches suivantes:

  • démarrer le sous-processus, rediriger sa sortie standard à une conduite
  • lire une ligne de sous-processus sortie standard asynchrone
  • tuer subprocess
  • attendre pour quitter

Chaque étape peut être limitée par timeout secondes si nécessaire.

Essayez le module asyncproc . Par exemple:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Le module prend en charge tout le filetage comme suggéré par S. Lott.

Vous pouvez le faire très facilement dans Twisted . En fonction de votre base de code existant, cela pourrait ne pas être facile à utiliser, mais si vous construisez une application tordue, alors les choses comme cela devient presque banal. Vous créez une classe ProcessProtocol, et remplacer la méthode outReceived(). Twisted (en fonction du réacteur utilisé) est habituellement juste une grande boucle de select() avec callbacks installés pour traiter les données de différents descripteurs de fichiers (souvent des prises de réseau). Ainsi, la méthode outReceived() installe simplement un rappel pour les données provenant de la manipulation STDOUT. Un exemple simple montrant ce comportement est le suivant:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

Le documentation Twisted a quelques bonnes informations à ce sujet.

Si vous construisez votre application entière autour de Twisted, il rend la communication asynchrone avec d'autres processus, locaux ou distants, vraiment élégant comme ça. D'autre part, si votre programme ne se construit pas sur le dessus de Twisted, cela ne va pas vraiment être utile que. Espérons que cela peut être utile à d'autres lecteurs, même si elle n'est pas applicable à votre application.

Utilisez sélectionner et lire (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Pour readline () - comme:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

Une solution consiste à faire un autre processus pour effectuer votre lecture du processus, ou faire un fil du processus avec un délai d'attente.

Voici la version filetée d'une fonction de temporisation:

http://code.activestate.com/recipes/473878/

Cependant, avez-vous besoin de lire le stdout comme il vient dans? Une autre solution peut être de vider la sortie dans un fichier et d'attendre la fin du processus en utilisant p.wait () .

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

Disclaimer: cela ne fonctionne que pour une tornade

Vous pouvez le faire en réglant le fd être bloquante puis utiliser ioloop pour enregistrer callbacks. J'ai emballé ce dans un œuf appelé tornado_subprocess et vous pouvez l'installer via PyPI:

easy_install tornado_subprocess

vous pouvez faire quelque chose comme ceci:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

vous pouvez également l'utiliser avec un RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

Les solutions existantes ne fonctionnent pas pour moi (détails ci-dessous). Ce qui a finalement travaillé consistait à mettre en œuvre à l'aide readline lecture (1) (basé sur cette réponse ). Celui-ci ne bloque pas:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Pourquoi les solutions existantes ne fonctionnent pas:

  1. Des solutions qui nécessitent readline (y compris ceux à base de file d'attente) bloquent toujours. Il est difficile (impossible?) De tuer le thread qui exécute readline. Il ne se fait tuer lorsque le processus qui l'a créé se termine, mais pas lorsque le processus de production de sortie est tué.
  2. Le mélange fcntl bas niveau avec des appels readline haut niveau peut ne pas fonctionner correctement comme anonnn a souligné.
  3. Utilisation select.poll () est propre, mais ne fonctionne pas sous Windows selon docs python.
  4. Utilisation de bibliothèques tierces semble trop pour cette tâche et ajoute des dépendances supplémentaires.

Cette version de non-bloquante ne pas ont besoin de modules spéciaux et travaillera hors-the-box à la majorité des distributions Linux.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

J'ajoute ce problème à lire certains stdout subprocess.Popen. Voici ma solution de lecture non bloquante:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

Voici mon code, utilisé pour attraper chaque sortie du sous-processus le plus tôt possible, y compris les lignes partielles. Il pompe en même temps et stdout et stderr pour presque correct.

Testé et travaillé correctement sur linux et windows Python 2.7.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

L'ajout de cette réponse ici car il fournit la capacité de fixer des tuyaux non-blocage sous Windows et Unix.

Tous les détails de ctypes sont grâce à @ techtonik de réponse.

Il existe une version légèrement modifiée pour être utilisé à la fois sur les systèmes Unix et Windows.

  • python3 compatible (seul changement mineur nécessaire) .
  • Comprend la version POSIX, et définit exception à utiliser pour les deux.

De cette façon, vous pouvez utiliser la même fonction et exception pour le code Unix et Windows.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

Pour éviter la lecture des données incomplètes, je fini par écrire mon propre générateur de readline (qui retourne la chaîne d'octets pour chaque ligne).

Son un générateur de sorte que vous pouvez par exemple ...

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

Le sélectionner le module vous aide à déterminer où la prochaine entrée utile est.

Cependant, vous êtes presque toujours plus heureux avec des fils séparés. On ne l'a bloquante stdin, autre fait partout où il est vous ne voulez pas obstrués.

pourquoi prendre la peine fil et la file d'attente? contrairement à readline (), BufferedReader.read1 () bloque l'habitude d'attente pour \ r \ n, il retourne le plus tôt possible en cas de sortie venant.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

Dans mon cas je besoin d'un module de journalisation qui attire la sortie des applications d'arrière-plan et l'augmente (en ajoutant horodatées, couleurs, etc.).

I fini avec un fil de fond qui fait le I / O réelle. code suivant est uniquement pour les plates-formes POSIX. Je retirai les parties non essentielles.

Si quelqu'un va utiliser cette bête pendant de longues séries considèrent la gestion des descripteurs ouverts. Dans mon cas, ce n'était pas un gros problème.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

J'ai créé une bibliothèque basée sur J. F. solution de Sebastian . Vous pouvez l'utiliser.

https://github.com/cenkalti/what

travail de la réponse de Sebastian J. F., et plusieurs autres sources, je l'ai mis en place un simple gestionnaire de sous-processus. Il fournit la demande de lecture non bloquante, ainsi que l'exécution de plusieurs processus en parallèle. Il n'utilise un appel spécifique à l'OS (que je sache) et doivent donc travailler partout.

Il est disponible à partir pypi, donc juste pip install shelljob. Reportez-vous au projet rel="nofollow"> des exemples et des documents complets.

EDIT: Cette implémentation encore des blocs. répondre à la place.

J'ai essayé le top réponse, mais le risque supplémentaire et l'entretien du code de fil était inquiétant.

Recherche par le le module io ( et étant limité à 2,6), j'ai trouvé BufferedReader. Ceci est mon threadless, solution non-blocage.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

Je suis récemment tombé sur le même problème Je dois lire une ligne au moment du flux (queue exécuté en sous-processus) en mode non-bloquant Je voulais éviter les problèmes suivants: ne pas brûler cpu, ne lisent pas le flux d'un octet (comme readline a), etc

Voici ma mise en œuvre https://gist.github.com/grubberr/5501e1a9760c3eab5e0a il ne prend pas en charge les fenêtres (sondage), ne pas manipuler EOF, mais cela fonctionne bien pour moi

Ceci est un exemple d'exécution de la commande interactive dans le sous-processus, et la sortie standard est interactive en utilisant un pseudo-terminal. Vous pouvez consulter: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

Mon problème est un peu différent car je voulais recueillir à la fois stdout et stderr d'un processus en cours d'exécution, mais finalement le même que je voulais rendre la sortie dans un widget comme produit.

Je ne voulais pas avoir recours à un grand nombre des solutions de contournement proposées à l'aide Queues ou threads supplémentaires comme ils ne devraient pas être nécessaire d'effectuer une telle tâche commune que l'exécution d'un autre script et la collecte de sa sortie.

Après avoir lu les solutions proposées et docs python Je résolus mon problème avec la mise en œuvre ci-dessous. Oui, il ne fonctionne que pour que je Posix en utilisant l'appel de la fonction select.

Je suis d'accord que les documents sont confus et la mise en œuvre est gênant pour une telle tâche de script commun. Je crois que les anciennes versions de python ont différentes valeurs par défaut pour Popen et des explications différentes, de façon qui a créé beaucoup de confusion. Cela semble bien fonctionner pour les deux Python 2.7.12 et 3.5.2.

La clé était de mettre bufsize=1 en mémoire tampon de ligne et universal_newlines=True puis à traiter comme un fichier texte au lieu d'un binaire qui semble devenir la valeur par défaut lors de la mise bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERREUR, DEBUG et verbose sont simplement des macros qui impriment la sortie du terminal.

Cette solution est à mon humble avis 99,99% efficace car il utilise toujours la fonction readline de blocage, donc nous supposons que le sous-processus est agréable et sorties des lignes complètes.

Je me réjouis de vos commentaires pour améliorer la solution que je suis encore nouveau pour Python.

Cette solution utilise le module select à « lire toutes les données disponibles » à partir d'un flux IO. Cette fonction bloque initialement jusqu'à ce que des données sont disponibles, mais se lit comme suit alors que les données sont disponibles et ne bloque pas davantage.

Compte tenu du fait qu'il utilise le module select, cela ne fonctionne que sous Unix.

Le code est entièrement pep8 conforme.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

J'ai aussi fait face au problème décrit par Jesse et résolu en utilisant « select » comme Bradley , Andy d'autres l'ont fait, mais dans un mode de blocage pour éviter une boucle d'attente. Il utilise un tuyau factice comme stdin faux. Les blocs de sélection et d'attendre soit stdin ou le tuyau pour être prêt. Quand une touche est pressée stdin la sélection et débloque la valeur de clé peut être récupérée en lecture (1). Quand un autre thread écrit dans le tube puis le tuyau et débloque la sélection, il peut être considéré comme une indication que la nécessité de stdin est terminée. Voici un code de référence:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

Les choses sont beaucoup mieux en Python moderne.

Voici un programme simple d'enfant, "hello.py":

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

Et un programme d'interagir avec elle:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

qui affiche:

b'hello bob\n'
b'hello alice\n'

Notez que le motif réel, qui est aussi par la quasi-totalité des réponses précédentes, à la fois ici et dans des domaines connexes, est de mettre à la non-blocage et sondage ensuite le descripteur de fichier stdout de l'enfant dans une sorte de boucle select. Ces jours-ci, bien sûr, cette boucle est fourni par asyncio.

Voici un module qui prend en charge non-blocage lit et écrit en python fond:

https://pypi.python.org/pypi/python-nonblock

Fournit une fonction,

nonblock_read qui va lire les données à partir du flux, le cas échéant, sinon retourner une chaîne vide (ou None si le flux est fermé de l'autre côté et toutes les données possibles a été lu)

Vous pouvez également envisager le module python-subprocess2,

https://pypi.python.org/pypi/python-subprocess2

ce qui ajoute au module de sous-processus. Ainsi, sur l'objet retourné de « subprocess.Popen » ajoute une méthode supplémentaire, runInBackground. Cela démarre un thread et retourne un objet qui sera automatiquement rempli comme substance est écrit à stdout / stderr, sans bloquer votre thread principal.

Amusez-vous!

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top