Pythonのサブプロセス.PIPEでのノンブロッキング読み取り
-
22-08-2019 - |
質問
私が使用しているのは、 サブプロセスモジュール サブプロセスを開始し、その出力ストリーム (stdout) に接続します。標準出力でノンブロッキング読み取りを実行できるようにしたいと考えています。.readline を非ブロッキングにする方法、または呼び出す前にストリームにデータがあるかどうかを確認する方法はありますか .readline
?これが移植可能であるか、少なくとも Windows と Linux で動作することを望みます。
今のところ私がそれを行う方法は次のとおりです( .readline
データが利用できない場合):
p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()
解決
fcntl
, select
, asyncproc
この場合は役に立ちません。
オペレーティング システムに関係なく、ブロックせずにストリームを読み取る信頼性の高い方法は、次のとおりです。 Queue.get_nowait()
:
import sys
from subprocess import PIPE, Popen
from threading import Thread
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()
# ... do other things here
# read line without blocking
try: line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
print('no output yet')
else: # got line
# ... do something with line
他のヒント
私は、多くの場合、同様の問題がありました。私が書くPythonプログラムは、頻繁に同時にコマンドライン入力(stdin)からのユーザ入力を受け入れながら、いくつかの主要な機能を実行する能力を持っている必要があります。単に別のスレッドでの処理機能をユーザ入力がreadline()
ブロックため、問題を解決し、タイムアウトを持っていません置きます。主な機能は完全ではありませんし、さらにユーザの入力を待つ必要性がなくなった場合、私は通常、私のプログラムを終了したいが、それはreadline()
はまだラインを待っている他のスレッドでブロックしていることができないため。私はこの問題に発見したソリューションは、fcntlのモジュールを使用して非ブロックファイルを標準入力にすることです。
import fcntl
import os
import sys
# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# user input handling thread
while mainThreadIsRunning:
try: input = sys.stdin.readline()
except: continue
handleInput(input)
私の意見では、これは、この問題を解決するために選択または信号モジュールを使用するよりも少しきれいですが、その後、再び、それは唯一のUNIX上で動作します...
Python 3.4 には新しい機能が導入されています 暫定API 非同期 IO の場合 -- asyncio
モジュール.
アプローチは次のようなものです twisted
@Bryan Wardによる-ベースの回答 -- プロトコルを定義すると、データの準備が整うとすぐにそのメソッドが呼び出されます。
#!/usr/bin/env python3
import asyncio
import os
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
print(data)
def connection_lost(self, exc):
loop.stop() # end loop.run_forever()
if os.name == 'nt':
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol,
"myprogram.exe", "arg1", "arg2"))
loop.run_forever()
finally:
loop.close()
見る ドキュメントの「サブプロセス」.
高レベルのインターフェイスがあります asyncio.create_subprocess_exec()
それは戻ってきます Process
オブジェクト これにより、非同期で行を読み取ることができます。 StreamReader.readline()
コルーチン (と async
/await
Python 3.5 以降の構文):
#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing
async def readline_and_kill(*args):
# start child process
process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)
# read line (sequence of bytes ending with b'\n') asynchronously
async for line in process.stdout:
print("got line:", line.decode(locale.getpreferredencoding(False)))
break
process.kill()
return await process.wait() # wait for the child process to exit
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
with closing(loop):
sys.exit(loop.run_until_complete(readline_and_kill(
"myprogram.exe", "arg1", "arg2")))
readline_and_kill()
次のタスクを実行します。
- サブプロセスを開始し、標準出力をパイプにリダイレクトします
- サブプロセスの標準出力から非同期で行を読み取ります
- サブプロセスを強制終了します
- 終了するのを待ちます
必要に応じて、各ステップをタイムアウト秒数で制限できます。
asyncproc のモジュールを試してみてください。たとえばます:
import os
from asyncproc import Process
myProc = Process("myprogram.app")
while True:
# check to see if process has ended
poll = myProc.wait(os.WNOHANG)
if poll != None:
break
# print any new output
out = myProc.read()
if out != "":
print out
S.Lottによって示唆されているように、モジュールは、すべてのスレッドの世話をします。
あなたはツイストの中で、本当に簡単にこれを行うことができます。既存のコードベースに応じて、これは使用することは容易ではないかもしれませんが、あなたはねじれたアプリケーションを構築している場合、このようなことはほとんど自明になります。あなたはProcessProtocol
クラスを作成し、outReceived()
メソッドをオーバーライドします。ツイスト(使用される反応器に依存する)は、通常、異なるファイルディスクリプタ(多くの場合、ネットワークソケット)からのデータを処理するためにインストールしたコールバックを持つだけ大きなselect()
ループです。だから、outReceived()
方法は、単純にSTDOUT
からのデータを処理するためのコールバックをインストールしています。次のようにこの動作を実証する簡単な例があります:
from twisted.internet import protocol, reactor
class MyProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data
proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()
この上でいくつかの良い情報を持っていのツイストドキュメントを。
あなたはツイストの周りにあなたの全体のアプリケーションを構築する場合、それはこのような本当にエレガントなローカルまたはリモートの他のプロセス、との非同期通信を行います。あなたのプログラムは、ツイストの上に構築されていない場合、一方、これは本当に便利になるだろうされていません。うまくいけば、これは特定のアプリケーションに適用できない場合でも、他の読者に役立つことができます。
を使用するには、(1)を選択&お読みください。
import subprocess #no new requirements
def readAllSoFar(proc, retVal=''):
while (select.select([proc.stdout],[],[],0)[0]!=[]):
retVal+=proc.stdout.read(1)
return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
print (readAllSoFar(p))
用のreadline() - のような:
lines = ['']
while not p.poll():
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
print a
解決策の 1 つは、プロセスの読み取りを実行する別のプロセスを作成するか、タイムアウトのあるプロセスのスレッドを作成することです。
タイムアウト関数のスレッド版は次のとおりです。
http://code.activestate.com/recipes/473878/
ただし、標準出力が受信されるときにそれを読み取る必要がありますか?別の解決策は、出力をファイルにダンプし、プロセスが終了するのを待つことです。 p.wait().
f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()
str = open('myprogram_output.txt','r').read()
免責事項:これは竜巻のためにのみ機能します。
あなたはノンブロッキングなり、その後、コールバックを登録するioloop使用するFDを設定することにより、これを行うことができます。私は tornado_subprocess に呼ばれる卵でこれをパッケージ化している、あなたはは、PyPI経由でインストールすることができます:
easy_install tornado_subprocess
今、あなたはこのような何かを行うことができます:
import tornado_subprocess
import tornado.ioloop
def print_res( status, stdout, stderr ) :
print status, stdout, stderr
if status == 0:
print "OK:"
print stdout
else:
print "ERROR:"
print stderr
t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()
あなたものRequestHandlerでそれを使用することができます。
class MyHandler(tornado.web.RequestHandler):
def on_done(self, status, stdout, stderr):
self.write( stdout )
self.finish()
@tornado.web.asynchronous
def get(self):
t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
既存のソリューションは私にとってはうまくいきませんでした (詳細は下記を参照)。最終的に機能したのは、read(1) (に基づいて) を使用して readline を実装することでした。 この答え)。後者はブロックしません。
from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
nextline = None
buf = ''
while True:
#--- extract line using read(1)
out = myprocess.stdout.read(1)
if out == '' and myprocess.poll() != None: break
if out != '':
buf += out
if out == '\n':
nextline = buf
buf = ''
if not nextline: continue
line = nextline
nextline = None
#--- do whatever you want with line here
print 'Line is:', line
myprocess.stdout.close()
myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()
#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
myprocess.kill()
myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
p1.join()
既存のソリューションが機能しなかった理由:
- readline を必要とするソリューション (キューベースのものを含む) は常にブロックされます。readlineを実行するスレッドを強制終了するのは困難(不可能?)です。これは、それを作成したプロセスが終了したときにのみ強制終了されますが、出力を生成するプロセスが強制終了された場合には強制終了されません。
- anonnn が指摘したように、低レベルの fcntl と高レベルの readline 呼び出しを混在させると、正しく動作しない可能性があります。
- select.poll() の使用は便利ですが、Python のドキュメントによると Windows では機能しません。
- サードパーティのライブラリを使用するのは、このタスクにはやりすぎであり、依存関係が追加される可能性があります。
読み、ノンブロッキングのこのバージョンのはの特別なモジュールを必要とせず、Linuxディストリビューションの大半にアウトオブボックスに動作します。
import os
import sys
import time
import fcntl
import subprocess
def async_read(fd):
# set non-blocking flag while preserving old flags
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# read char until EOF hit
while True:
try:
ch = os.read(fd.fileno(), 1)
# EOF
if not ch: break
sys.stdout.write(ch)
except OSError:
# waiting for data be available on fd
pass
def shell(args, async=True):
# merge stderr and stdout
proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if async: async_read(proc.stdout)
sout, serr = proc.communicate()
return (sout, serr)
if __name__ == '__main__':
cmd = 'ping 8.8.8.8'
sout, serr = shell(cmd.split())
私はいくつかのsubprocess.Popen標準出力を読み取るために、この問題を追加します。 ここに私の非ブロッキング読み取りソリューションがあります:
import fcntl
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()
# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'
ここで、部分的ラインを含むできるだけ早くサブプロセスからのすべての出力をキャッチするために使用され、私のコードです。それはほとんど正しい順序で同じ時間とstdoutとstderrにポンプます。
テストされ、正しくはPython 2.7のLinuxおよびWindows上で働いていました。
#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
if (len(argv) > 1) and (argv[-1] == "-sub-"):
import time, sys
print "Application runned!"
time.sleep(2)
print "Slept 2 second"
time.sleep(1)
print "Slept 1 additional second",
time.sleep(2)
sys.stderr.write("Stderr output after 5 seconds")
print "Eol on stdin"
sys.stderr.write("Eol on stderr\n")
time.sleep(1)
print "Wow, we have end of work!",
else:
os.environ["PYTHONUNBUFFERED"]="1"
try:
p = Popen( argv + ["-sub-"],
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
except WindowsError, W:
if W.winerror==193:
p = Popen( argv + ["-sub-"],
shell=True, # Try to run via shell
bufsize=0, # line-buffered
stdin=PIPE, stdout=PIPE, stderr=PIPE )
else:
raise
inp = Queue.Queue()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
def Pump(stream, category):
queue = Queue.Queue()
def rdr():
while True:
buf = stream.read1(8192)
if len(buf)>0:
queue.put( buf )
else:
queue.put( None )
return
def clct():
active = True
while active:
r = queue.get()
try:
while True:
r1 = queue.get(timeout=0.005)
if r1 is None:
active = False
break
else:
r += r1
except Queue.Empty:
pass
inp.put( (category, r) )
for tgt in [rdr, clct]:
th = Thread(target=tgt)
th.setDaemon(True)
th.start()
Pump(sout, 'stdout')
Pump(serr, 'stderr')
while p.poll() is None:
# App still working
try:
chan,line = inp.get(timeout = 1.0)
if chan=='stdout':
print "STDOUT>>", line, "<?<"
elif chan=='stderr':
print " ERROR==", line, "=?="
except Queue.Empty:
pass
print "Finish"
if __name__ == '__main__':
__main__()
Windows と Unix でノンブロッキング パイプを設定する機能が提供されるため、この回答をここに追加します。
全ての ctypes
詳細はおかげさまで @techtonikさんの答え.
Unix と Windows システムの両方で使用できるようにわずかに変更されたバージョンがあります。
- Python3互換 (軽微な変更のみが必要です).
- posix バージョンが含まれており、どちらかに使用する例外を定義します。
こうすることで、Unix と Windows コードで同じ関数と例外を使用できます。
# pipe_non_blocking.py (module)
"""
Example use:
p = subprocess.Popen(
command,
stdout=subprocess.PIPE,
)
pipe_non_blocking_set(p.stdout.fileno())
try:
data = os.read(p.stdout.fileno(), 1)
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
"""
__all__ = (
"pipe_non_blocking_set",
"pipe_non_blocking_is_error_blocking",
"PortableBlockingIOError",
)
import os
if os.name == "nt":
def pipe_non_blocking_set(fd):
# Constant could define globally but avoid polluting the name-space
# thanks to: https://stackoverflow.com/questions/34504970
import msvcrt
from ctypes import windll, byref, wintypes, WinError, POINTER
from ctypes.wintypes import HANDLE, DWORD, BOOL
LPDWORD = POINTER(DWORD)
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
def pipe_no_wait(pipefd):
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
SetNamedPipeHandleState.restype = BOOL
h = msvcrt.get_osfhandle(pipefd)
res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
if res == 0:
print(WinError())
return False
return True
return pipe_no_wait(fd)
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
from ctypes import GetLastError
ERROR_NO_DATA = 232
return (GetLastError() == ERROR_NO_DATA)
PortableBlockingIOError = OSError
else:
def pipe_non_blocking_set(fd):
import fcntl
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
return True
def pipe_non_blocking_is_error_blocking(ex):
if not isinstance(ex, PortableBlockingIOError):
return False
return True
PortableBlockingIOError = BlockingIOError
不完全なデータの読み取りを避けるために、私は独自の readline ジェネレーター (各行のバイト文字列を返す) を作成することになりました。
ジェネレーターなので、たとえば...
def non_blocking_readlines(f, chunk=1024):
"""
Iterate over lines, yielding b'' when nothings left
or when new data is not yet available.
stdout_iter = iter(non_blocking_readlines(process.stdout))
line = next(stdout_iter) # will be a line or b''.
"""
import os
from .pipe_non_blocking import (
pipe_non_blocking_set,
pipe_non_blocking_is_error_blocking,
PortableBlockingIOError,
)
fd = f.fileno()
pipe_non_blocking_set(fd)
blocks = []
while True:
try:
data = os.read(fd, chunk)
if not data:
# case were reading finishes with no trailing newline
yield b''.join(blocks)
blocks.clear()
except PortableBlockingIOError as ex:
if not pipe_non_blocking_is_error_blocking(ex):
raise ex
yield b''
continue
while True:
n = data.find(b'\n')
if n == -1:
break
yield b''.join(blocks) + data[:n + 1]
data = data[n + 1:]
blocks.clear()
blocks.append(data)
私は、元の質問者の問題を持っていますが、スレッドを起動したくありませんでした。私は、パイプからの直接読み取り()とジェシーのソリューションを混合し、ラインのための私自身のバッファ・ハンドラは、読み込み(ただし、私のサブプロセス - pingは - 常にフルライン<システムのページサイズを書きました)。私は唯一のGObject-登録IO時計に読み取ることによって、ビジー待ちを避けます。このごろ私は通常、スレッドを避けるために、GObjectのメインループ内のコードを実行します。
def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down
ウォッチャーである
def watch(f, *other):
print 'reading',f.read()
return True
そして、メインプログラムがpingを設定して、GObjectのメールループを呼び出します。
def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()
他の作業はGObjectの中のコールバックに添付されます。
なぜわざわざスレッド&キュー? readlineの()とは異なり、BufferedReader.read1()は習慣に来る任意の出力がある場合、それはできるだけ早く返し、\ r \ nのを待ってブロックします。
#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io
def __main__():
try:
p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
except: print("Popen failed"); quit()
sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
while True:
buf = sout.read1(1024)
if len(buf) == 0: break
print buf,
if __name__ == '__main__':
__main__()
私の場合は、(タイムスタンプ、色、などを追加すること。)バックグラウンドアプリケーションからの出力をキャッチし、それを補強するロギングモジュールを必要に応じています。
私は、実際のI / Oを行い、バックグラウンドスレッドになってしまいました。コードに続いて、POSIXプラットフォームのためにだけです。私は非本質的な部分を除去しました。
誰かがロングランがオープンディスクリプタを管理考えるため、この獣を使用しようとした場合。私の場合、それは大きな問題ではありませんでした。
# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess
class Logger(threading.Thread):
def __init__(self, *modules):
threading.Thread.__init__(self)
try:
from select import epoll, EPOLLIN
self.__poll = epoll()
self.__evt = EPOLLIN
self.__to = -1
except:
from select import poll, POLLIN
print 'epoll is not available'
self.__poll = poll()
self.__evt = POLLIN
self.__to = 100
self.__fds = {}
self.daemon = True
self.start()
def run(self):
while True:
events = self.__poll.poll(self.__to)
for fd, ev in events:
if (ev&self.__evt) != self.__evt:
continue
try:
self.__fds[fd].run()
except Exception, e:
print e
def add(self, fd, log):
assert not self.__fds.has_key(fd)
self.__fds[fd] = log
self.__poll.register(fd, self.__evt)
class log:
logger = Logger()
def __init__(self, name):
self.__name = name
self.__piped = False
def fileno(self):
if self.__piped:
return self.write
self.read, self.write = os.pipe()
fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
self.fdRead = os.fdopen(self.read)
self.logger.add(self.read, self)
self.__piped = True
return self.write
def __run(self, line):
self.chat(line, nl=False)
def run(self):
while True:
try: line = self.fdRead.readline()
except IOError, exc:
if exc.errno == errno.EAGAIN:
return
raise
self.__run(line)
def chat(self, line, nl=True):
if nl: nl = '\n'
else: nl = ''
sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))
def system(command, param=[], cwd=None, env=None, input=None, output=None):
args = [command] + param
p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
p.wait()
ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)
date = log('date')
date.chat('go')
system("date", output=date)
私は J. F.セバスチャンのソリューションに。あなたはそれを使用することができます。
J.F.セバスチャンの答え、およびいくつかの他のソースからの作業、私は一緒に簡単なサブプロセスマネージャを入れています。それは同様に、並列に複数のプロセスを実行するよう、要求非ブロッキング読み取りを提供します。それは(私は承知していることを)任意のOS固有のコールを使用していないので、どこでも動作するはずです。
これは、これだけpip install shelljob
は、PyPIから入手可能です。例と完全なドキュメントのためのプロジェクトページを参照してください。
EDIT:この実装はまだブロック。 J.F.Sebastianのに答えるhref="https://stackoverflow.com/a/4896288/2359288">使用します。
<ストライキ>私はトップ解答を試みたが、スレッドコードの追加のリスクとメンテナンスが気になりました。ストライキ>
<ストライキ> ストライキ><ストライキ> IOモジュールの(経由見ますそして、)2.6に限定され、私はBufferedReaderのを見つけました。これは私のスレッドレス、非ブロッキング溶液である。ストライキ>
import io
from subprocess import PIPE, Popen
p = Popen(['myprogram.exe'], stdout=PIPE)
SLEEP_DELAY = 0.001
# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
while p.poll() == None:
time.sleep(SLEEP_DELAY)
while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
line = buffer.readline()
# do stuff with the line
# Handle any remaining output after the process has ended
while buffer.peek():
line = buffer.readline()
# do stuff with the line
私は最近、同じ問題につまずきました 私は、ストリーム(サブプロセスで実行尾)から一度に1行を読む必要があります 非ブロックモードで
など、CPUを燃やすしない、(readlineのが行ったように)1バイトのストリームを読んでいない:私は次の問題を回避したかったですここに私の実装です https://gist.github.com/grubberr/5501e1a9760c3eab5e0aする それはEOFを処理しない、窓(投票を)サポートしていません、 しかし、それはうまく私のために動作します。
このサブプロセスに対話型コマンドを実行するための一例であり、STDOUTは疑似端末を使用して対話型です。 https://stackoverflow.com/a/43012138/3555925 の
:あなたはを参照することができます#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen
command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()
# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()
# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
preexec_fn=os.setsid,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
universal_newlines=True)
while p.poll() is None:
r, w, e = select.select([sys.stdin, master_fd], [], [])
if sys.stdin in r:
d = os.read(sys.stdin.fileno(), 10240)
os.write(master_fd, d)
elif master_fd in r:
o = os.read(master_fd, 10240)
if o:
os.write(sys.stdout.fileno(), o)
# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
私の問題は、私はその生成されたウィジェットで出力をレンダリングしたかったので、私は、実行中のプロセスからのstdoutとstderrの両方を収集したいと少し異なりますが、最終的には同じ。
私は、彼らが別のスクリプトを実行し、その出力を集めるような一般的なタスクを実行する必要はありませんようキューまたは追加のスレッドを使用して提案した回避策の多くに頼る必要はありませんでした。
提案されたソリューションとPythonのドキュメントを読んだ後、私は以下の実装と私の問題を解決しました。私はselect
関数呼び出しを使用しているとはい、それは唯一のPOSIXのために働きます。
私は、ドキュメントが混乱しており、実装は、このような一般的なスクリプトタスクに厄介であることに同意します。私は、Pythonの古いバージョンがPopen
と混乱の多くを作成したので、別の説明については、異なるデフォルト値を持っていると信じています。これは、Python 2.7.12と3.5.2の両方でうまく動作するようです。
キーは、ラインバッファリングのためのbufsize=1
を設定し、代わりにuniversal_newlines=True
を設定するときにデフォルトになりそうなバイナリのテキストファイルとして処理するbufsize=1
することでした。
class workerThread(QThread):
def __init__(self, cmd):
QThread.__init__(self)
self.cmd = cmd
self.result = None ## return code
self.error = None ## flag indicates an error
self.errorstr = "" ## info message about the error
def __del__(self):
self.wait()
DEBUG("Thread removed")
def run(self):
cmd_list = self.cmd.split(" ")
try:
cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
, universal_newlines=True
, stderr=subprocess.PIPE
, stdout=subprocess.PIPE)
except OSError:
self.error = 1
self.errorstr = "Failed to execute " + self.cmd
ERROR(self.errorstr)
finally:
VERBOSE("task started...")
import select
while True:
try:
r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
if cmd.stderr in r:
line = cmd.stderr.readline()
if line != "":
line = line.strip()
self.emit(SIGNAL("update_error(QString)"), line)
if cmd.stdout in r:
line = cmd.stdout.readline()
if line == "":
break
line = line.strip()
self.emit(SIGNAL("update_output(QString)"), line)
except IOError:
pass
cmd.wait()
self.result = cmd.returncode
if self.result < 0:
self.error = 1
self.errorstr = "Task terminated by signal " + str(self.result)
ERROR(self.errorstr)
return
if self.result:
self.error = 1
self.errorstr = "exit code " + str(self.result)
ERROR(self.errorstr)
return
return
ERROR、DEBUGとVERBOSE単に端末に出力を印刷するマクロです。
それはまだブロッキングreadline
機能を使用するため、我々は、サブプロセスがいいと出力の完全なラインであると仮定して、このソリューションは、私見99.99%効果的です。
私は、私は、Pythonにまだ新しいですとソリューションを改善するためのフィードバックを歓迎します。
この溶液は、IOストリームから「任意の利用可能なデータを読み取る」にselect
モジュールを使用します。当初、これは、関数ブロックデータが利用可能であるが、その後利用可能であり、さらにブロックされないデータだけを読み込みます。
それはselect
モジュールを使用するという事実を考えると、これはUnixでのみ動作します。
のコードが完全にPEP8に準拠しています。
import select
def read_available(input_stream, max_bytes=None):
"""
Blocks until any data is available, then all available data is then read and returned.
This function returns an empty string when end of stream is reached.
Args:
input_stream: The stream to read from.
max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.
Returns:
str
"""
# Prepare local variables
input_streams = [input_stream]
empty_list = []
read_buffer = ""
# Initially block for input using 'select'
if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:
# Poll read-readiness using 'select'
def select_func():
return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0
# Create while function based on parameters
if max_bytes is not None:
def while_func():
return (len(read_buffer) < max_bytes) and select_func()
else:
while_func = select_func
while True:
# Read single byte at a time
read_data = input_stream.read(1)
if len(read_data) == 0:
# End of stream
break
# Append byte to string buffer
read_buffer += read_data
# Check if more data is available
if not while_func():
break
# Return read buffer
return read_buffer
私も、によって説明された問題に直面しました ジェシー 「選択」を次のように使用して解決しました ブラッドリー, アンディ 他の人はそうしましたが、ビジーループを避けるためにブロッキングモードでした。ダミーのパイプを偽の標準入力として使用します。選択はブロックされ、標準入力またはパイプの準備ができるまで待機します。キーが押されると、標準入力は選択のブロックを解除し、キー値は read(1) で取得できます。別のスレッドがパイプに書き込むと、パイプは選択のブロックを解除し、stdin の必要性がなくなったことを示すものとみなすことができます。以下に参照コードを示します。
import sys
import os
from select import select
# -------------------------------------------------------------------------
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")
# -------------------------------------------------------------------------
def getKey():
# Wait for stdin or pipe (fake stdin) to be ready
dr,dw,de = select([sys.__stdin__, readFile], [], [])
# If stdin is the one ready then read it and return value
if sys.__stdin__ in dr:
return sys.__stdin__.read(1) # For Windows use ----> getch() from module msvcrt
# Must finish
else:
return None
# -------------------------------------------------------------------------
def breakStdinRead():
writeFile.write(' ')
writeFile.flush()
# -------------------------------------------------------------------------
# MAIN CODE
# Get key stroke
key = getKey()
# Keyboard input
if key:
# ... do your stuff with the key value
# Faked keystroke
else:
# ... use of stdin finished
# -------------------------------------------------------------------------
# OTHER THREAD CODE
breakStdinRead()
物事は近代的なPythonで多くの優れています。
ここでは、単純な子プログラムだ、 "hello.py" ます:
#!/usr/bin/env python3
while True:
i = input()
if i == "quit":
break
print(f"hello {i}")
そして、それと対話するためのプログラムます:
import asyncio
async def main():
proc = await asyncio.subprocess.create_subprocess_exec(
"./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b"bob\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"alice\n")
print(await proc.stdout.read(1024))
proc.stdin.write(b"quit\n")
await proc.wait()
asyncio.run(main())
これは出力します:
b'hello bob\n'
b'hello alice\n'
、ほぼすべての以前の回答のことでも両方ここに、関連の質問にある実際のパターンは、非ブロックに子供のstdoutファイルディスクリプタを設定し、selectループのいくつかの並べ替えで、それをポーリングすることであることに注意してください。最近では、当然のことながら、そのループがasyncioによって提供されます。
ここでノンブロッキングをサポートするモジュールを読み込んで、背景はpythonで書き込まれます:
https://pypi.python.org/pypi/python-nonblockする
機能を提供し、
利用可能な場合、ストリームからデータを読み取りますnonblock_read、(流れが反対側に閉じられ、すべての可能なデータが読み込まれた場合、またはなし)そうでなければ空の文字列を返す
また、
、のpython-subprocess2モジュールを考慮することができますhttps://pypi.python.org/pypi/python-subprocess2する
これは、サブプロセス・モジュールに追加されます。だから、「subprocess.Popen」から返されたオブジェクトに追加する方法、runInBackgroundが追加されます。これは、スレッドを開始して、あなたのメインスレッドをブロックすることなく、自動的に標準出力/標準エラー出力に書き込まれているものとして読み込まれるオブジェクトを返します。
お楽しみください!