pyserial — Как прочитать последнюю строку, отправленную с последовательного устройства
-
11-09-2019 - |
Вопрос
У меня есть Arduino, подключенный к моему компьютеру, который выполняет цикл, отправляя значение через последовательный порт обратно на компьютер каждые 100 мс.
Я хочу создать скрипт Python, который будет считывать данные из последовательного порта только каждые несколько секунд, поэтому я хочу, чтобы он просто видел последнее сообщение, отправленное с Arduino.
Как это сделать в Pyserial?
Вот код, который я пробовал, но он не работает.Он читает строки последовательно.
import serial
import time
ser = serial.Serial('com4',9600,timeout=1)
while 1:
time.sleep(10)
print ser.readline() #How do I get the most recent line sent from the device?
Решение
Возможно, я неправильно понимаю ваш вопрос, но поскольку это последовательная линия, вам придется читать все, отправленное с Arduino, последовательно — оно будет буферизовано в Arduino, пока вы его не прочитаете.
Если вы хотите иметь отображение статуса, показывающее последнее отправленное сообщение, используйте поток, который включает код в ваш вопрос (за исключением спящего режима), и сохраняйте последнюю полную строку, прочитанную как последнюю строку от Arduino.
Обновлять: mtasic
Пример кода довольно хорош, но если Arduino отправил частичную строку, когда inWaiting()
называется, вы получите усеченную строку.Вместо этого вы хотите поставить последний полный линия в last_received
, и сохраните частичную строку в buffer
чтобы его можно было добавить в следующий раз в цикле.Что-то вроде этого:
def receiving(ser):
global last_received
buffer_string = ''
while True:
buffer_string = buffer_string + ser.read(ser.inWaiting())
if '\n' in buffer_string:
lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
last_received = lines[-2]
#If the Arduino sends lots of empty lines, you'll lose the
#last filled line, so you could make the above statement conditional
#like so: if lines[-2]: last_received = lines[-2]
buffer_string = lines[-1]
Что касается использования readline()
:Вот что говорит документация Pyserial (слегка отредактированная для ясности и с упоминанием readlines()):
Будьте осторожны при использовании «readline».Укажите тайм -аут при открытии последовательного порта, в противном случае он может заблокировать навсегда, если не получен новый персонаж.Также обратите внимание, что «readlines ()» работает только с тайм -аутом.Это зависит от наличия тайм -аута и интерпретации, которые как EOF (конец файла).
что мне кажется вполне разумным!
Другие советы
from serial import *
from threading import Thread
last_received = ''
def receiving(ser):
global last_received
buffer = ''
while True:
# last_received = ser.readline()
buffer += ser.read(ser.inWaiting())
if '\n' in buffer:
last_received, buffer = buffer.split('\n')[-2:]
if __name__ == '__main__':
ser = Serial(
port=None,
baudrate=9600,
bytesize=EIGHTBITS,
parity=PARITY_NONE,
stopbits=STOPBITS_ONE,
timeout=0.1,
xonxoff=0,
rtscts=0,
interCharTimeout=None
)
Thread(target=receiving, args=(ser,)).start()
Эти решения будут перегружать процессор во время ожидания символов.
Вам следует выполнить хотя бы один блокирующий вызов для read(1)
while True:
if '\n' in buffer:
pass # skip if a line already in buffer
else:
buffer += ser.read(1) # this will block until one more char or timeout
buffer += ser.read(ser.inWaiting()) # get remaining buffered chars
... и сделайте разделение, как и раньше.
Вы можете использовать ser.flushInput()
для очистки всех последовательных данных, которые в данный момент находятся в буфере.
После очистки старых данных вы можете использовать ser.readline(), чтобы получить самые последние данные с последовательного устройства.
Я думаю, что это немного проще, чем другие предложенные здесь решения.Мне помогло, надеюсь, вам подойдет.
Этот метод позволяет отдельно контролировать таймаут сбора всех данных для каждой строки и разный таймаут ожидания дополнительных строк.
# get the last line from serial port
lines = serial_com()
lines[-1]
def serial_com():
'''Serial communications: get a response'''
# open serial port
try:
serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
except serial.SerialException as e:
print("could not open serial port '{}': {}".format(com_port, e))
# read response from serial port
lines = []
while True:
line = serial_port.readline()
lines.append(line.decode('utf-8').rstrip())
# wait for new data after each line
timeout = time.time() + 0.1
while not serial_port.inWaiting() and timeout > time.time():
pass
if not serial_port.inWaiting():
break
#close the serial port
serial_port.close()
return lines
Вам понадобится цикл для чтения всего отправленного, при этом последний вызов readline() будет блокироваться до истечения времени ожидания.Так:
def readLastLine(ser):
last_data=''
while True:
data=ser.readline()
if data!='':
last_data=data
else:
return last_data
Небольшая модификация кода mtasic и Виная Саджипа:
Хотя я нашел этот код весьма полезным для аналогичного приложения, мне нужно было все линии, возвращающиеся от последовательного устройства, которое периодически отправляло информацию.
Я решил вытащить первый элемент сверху, записать его, а затем снова объединить оставшиеся элементы в качестве нового буфера и продолжить оттуда.
Я понимаю, что это нет то, о чем просил Грег, но я подумал, что об этом стоит рассказать в качестве примечания.
def receiving(ser):
global last_received
buffer = ''
while True:
buffer = buffer + ser.read(ser.inWaiting())
if '\n' in buffer:
lines = buffer.split('\n')
last_received = lines.pop(0)
buffer = '\n'.join(lines)
С использованием .inWaiting()
внутри бесконечного цикла может быть проблематично.Это может захватить всю Процессор в зависимости от реализации.Вместо этого я бы рекомендовал использовать определенный размер данных для чтения.В этом случае необходимо сделать, например, следующее:
ser.read(1024)
Слишком много сложностей
В чем причина разделения объекта байтов с помощью новой строки или других манипуляций с массивом?Я пишу самый простой метод, который решит вашу проблему:
import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
last = ''
for byte in s.read(s.inWaiting()): last += chr(byte)
if len(last) > 0:
# Do whatever you want with last
print (bytes(last, "utf-8"))
last = ''
Вот пример использования оболочки, которая позволяет вам читать самую последнюю строку без 100% загрузки ЦП.
class ReadLine:
"""
pyserial object wrapper for reading line
source: https://github.com/pyserial/pyserial/issues/216
"""
def __init__(self, s):
self.buf = bytearray()
self.s = s
def readline(self):
i = self.buf.find(b"\n")
if i >= 0:
r = self.buf[:i + 1]
self.buf = self.buf[i + 1:]
return r
while True:
i = max(1, min(2048, self.s.in_waiting))
data = self.s.read(i)
i = data.find(b"\n")
if i >= 0:
r = self.buf + data[:i + 1]
self.buf[0:] = data[i + 1:]
return r
else:
self.buf.extend(data)
s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
print(device.readline())