pyserial - シリアルデバイスから送信された最後の行を読み取る方法
-
11-09-2019 - |
質問
Arduino をコンピュータに接続してループを実行し、100 ミリ秒ごとにシリアル ポート経由で値をコンピュータに送り返しています。
シリアルポートから数秒ごとにのみ読み取るPythonスクリプトを作成したいので、Arduinoから最後に送信されたものだけを確認したいと考えています。
Pyserial ではどうやってこれを行うのでしょうか?
これが私が試したコードですが、機能しませんでした。行を順番に読み取ります。
import serial
import time
ser = serial.Serial('com4',9600,timeout=1)
while 1:
time.sleep(10)
print ser.readline() #How do I get the most recent line sent from the device?
解決
おそらく質問を誤解していると思いますが、シリアルラインであるため、Arduino から送信されたすべてのものを順番に読み取る必要があります。読み取るまで Arduino にバッファリングされます。
最新の送信内容を示すステータス表示が必要な場合は、質問にコードを組み込んだスレッドを使用し(スリープを除く)、最後の完全な行を Arduino からの最新行として読み取ったままにしておきます。
アップデート: mtasic
のサンプルコードは非常に優れていますが、Arduino が部分的な行を送信した場合、 inWaiting()
を呼び出すと、切り詰められた行が表示されます。代わりに、あなたがやりたいのは、最後に 完了 ラインに入る last_received
, 、部分的な行をそのままにしておきます buffer
これにより、ループの次回に追加できるようになります。このようなもの:
def receiving(ser):
global last_received
buffer_string = ''
while True:
buffer_string = buffer_string + ser.read(ser.inWaiting())
if '\n' in buffer_string:
lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
last_received = lines[-2]
#If the Arduino sends lots of empty lines, you'll lose the
#last filled line, so you could make the above statement conditional
#like so: if lines[-2]: last_received = lines[-2]
buffer_string = lines[-1]
ご利用に関して readline()
:Pyserial のドキュメントに記載されている内容は次のとおりです (わかりやすくするために少し編集し、readlines() についても言及しています)。
「readline」を使用する場合は注意してください。シリアルポートを開くときにタイムアウトを指定してください。そうしないと、新しいライン文字が受信されない場合は永遠にブロックされる可能性があります。また、「readlines()」はタイムアウトでのみ動作することに注意してください。タイムアウトを持つことに依存し、それをEOF(ファイルの終了)として解釈します。
それは私にとって非常に合理的だと思われます!
他のヒント
from serial import *
from threading import Thread
last_received = ''
def receiving(ser):
global last_received
buffer = ''
while True:
# last_received = ser.readline()
buffer += ser.read(ser.inWaiting())
if '\n' in buffer:
last_received, buffer = buffer.split('\n')[-2:]
if __name__ == '__main__':
ser = Serial(
port=None,
baudrate=9600,
bytesize=EIGHTBITS,
parity=PARITY_NONE,
stopbits=STOPBITS_ONE,
timeout=0.1,
xonxoff=0,
rtscts=0,
interCharTimeout=None
)
Thread(target=receiving, args=(ser,)).start()
これらのソリューションは、CPUを占有します。
あなたが読むために少なくとも一つのブロッキング呼び出しを行う必要があります(1)
while True:
if '\n' in buffer:
pass # skip if a line already in buffer
else:
buffer += ser.read(1) # this will block until one more char or timeout
buffer += ser.read(ser.inWaiting()) # get remaining buffered chars
...前と同様に分割されたことを行うます。
あなたは、バッファ内に現在あるすべてのシリアル・データをフラッシュするser.flushInput()
を使用することができます。
は、古いデータを消去した後、あなたはユーザーser.readline()は、シリアルデバイスから最新のデータを取得することができます。
私はここに他の提案された解決策よりもその少し単純だと思います。私のために働いた、それはあなたのために適し願っています。
この方法では、別途追加の行で待機しているため、各ラインのすべてのデータを収集するためのタイムアウト、および異なるタイムアウトを制御することができます。
# get the last line from serial port
lines = serial_com()
lines[-1]
def serial_com():
'''Serial communications: get a response'''
# open serial port
try:
serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
except serial.SerialException as e:
print("could not open serial port '{}': {}".format(com_port, e))
# read response from serial port
lines = []
while True:
line = serial_port.readline()
lines.append(line.decode('utf-8').rstrip())
# wait for new data after each line
timeout = time.time() + 0.1
while not serial_port.inWaiting() and timeout > time.time():
pass
if not serial_port.inWaiting():
break
#close the serial port
serial_port.close()
return lines
あなたは、タイムアウトするまでブロックする)(readlineのために最後の呼び出しで、送信されたすべてのものを読むためにループが必要になります。だから、ます:
def readLastLine(ser):
last_data=''
while True:
data=ser.readline()
if data!='':
last_data=data
else:
return last_data
mtasic&ビナイSajipのコードに若干の変更:
私は同様のアプリケーションのために私には非常に役立つ、このコードを見つけましたが、私は必要なのすべてのの情報を定期的に送信し、シリアルデバイスから戻ってくるの線ます。
Iは、上から最初の要素をポップし、それを記録し、新しいバッファとして残りの要素を再結合し、そこから継続することを選択します。
私は、これは にないグレッグが求めていたものであることを認識し、私はそれがサイドノートとして共有する価値だと思っています。
def receiving(ser):
global last_received
buffer = ''
while True:
buffer = buffer + ser.read(ser.inWaiting())
if '\n' in buffer:
lines = buffer.split('\n')
last_received = lines.pop(0)
buffer = '\n'.join(lines)
無限ループ内.inWaiting()
を使用すると、問題となり得ます。これは、実装によっては全体 CPU をアップ豚があります。その代わり、私が読み込まれるデータの具体的なサイズを使用することをお勧めします。したがって、この場合には、以下の例のために行われる必要があります:
ser.read(1024)
をあまりにも多くの合併症の
改行または他の配列操作によってbytesオブジェクトを分割する理由は何ですか? 私はあなたの問題を解決する最も簡単な方法を、記述します:
import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
last = ''
for byte in s.read(s.inWaiting()): last += chr(byte)
if len(last) > 0:
# Do whatever you want with last
print (bytes(last, "utf-8"))
last = ''
ここでは、100%のCPUなしで最新のラインを読むことができますラッパーを使用した例です。
class ReadLine:
"""
pyserial object wrapper for reading line
source: https://github.com/pyserial/pyserial/issues/216
"""
def __init__(self, s):
self.buf = bytearray()
self.s = s
def readline(self):
i = self.buf.find(b"\n")
if i >= 0:
r = self.buf[:i + 1]
self.buf = self.buf[i + 1:]
return r
while True:
i = max(1, min(2048, self.s.in_waiting))
data = self.s.read(i)
i = data.find(b"\n")
if i >= 0:
r = self.buf + data[:i + 1]
self.buf[0:] = data[i + 1:]
return r
else:
self.buf.extend(data)
s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
print(device.readline())