PySerial - Cómo leer la última línea enviada desde un dispositivo serie
-
11-09-2019 - |
Pregunta
Tengo un Arduino conectado a mi equipo que ejecuta un bucle, el envío de un valor través del puerto serie de vuelta al ordenador cada 100 ms.
Quiero hacer un script en Python que leerá desde el puerto serie sólo cada pocos segundos, por lo que yo quiero que acaba de ver la última cosa enviada desde el Arduino.
¿Cómo se hace esto en PySerial?
Este es el código Probé el que does't trabajo. Se lee las líneas secuencialmente.
import serial
import time
ser = serial.Serial('com4',9600,timeout=1)
while 1:
time.sleep(10)
print ser.readline() #How do I get the most recent line sent from the device?
Solución
Tal vez estoy malentendido su pregunta, pero ya que es una línea serie, que tendrá que leer todo lo enviado desde la secuencialmente Arduino -. Que va a ser amortiguada en el Arduino, hasta que lo lea
Si usted quiere tener una pantalla de estado que muestra lo último enviado -. Utilizar un hilo que incorpora el código en su pregunta (menos el sueño), y tener la última línea completa leer como la última línea de la Arduino
Actualización: código de ejemplo de mtasic
es bastante bueno, pero si el Arduino ha enviado una línea parcial cuando inWaiting()
se llama, obtendrá una línea truncada. En cambio, lo que quiere hacer es poner el último línea completa en last_received
, y mantener la línea parcial en buffer
de manera que se puede añadir a la próxima vez que el bucle. Algo como esto:
def receiving(ser):
global last_received
buffer_string = ''
while True:
buffer_string = buffer_string + ser.read(ser.inWaiting())
if '\n' in buffer_string:
lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
last_received = lines[-2]
#If the Arduino sends lots of empty lines, you'll lose the
#last filled line, so you could make the above statement conditional
#like so: if lines[-2]: last_received = lines[-2]
buffer_string = lines[-1]
En cuanto al uso de readline()
: Esto es lo que la documentación PySerial tiene que decir (ligeramente editado para mayor claridad y con una mención a readlines ()):
Tenga cuidado al usar "readline". Hacer especificar un tiempo de espera cuando se abre la puerto serie, de lo contrario podría bloquear siempre si hay carácter de nueva línea es recibido. También tenga en cuenta que "readlines ()" sólo funciona con un tiempo de espera. Eso depende de tener un tiempo de espera y interpreta que como EOF (final del archivo).
que parece bastante razonable para mí!
Otros consejos
from serial import *
from threading import Thread
last_received = ''
def receiving(ser):
global last_received
buffer = ''
while True:
# last_received = ser.readline()
buffer += ser.read(ser.inWaiting())
if '\n' in buffer:
last_received, buffer = buffer.split('\n')[-2:]
if __name__ == '__main__':
ser = Serial(
port=None,
baudrate=9600,
bytesize=EIGHTBITS,
parity=PARITY_NONE,
stopbits=STOPBITS_ONE,
timeout=0.1,
xonxoff=0,
rtscts=0,
interCharTimeout=None
)
Thread(target=receiving, args=(ser,)).start()
Estas soluciones se aproveche la CPU a la espera de caracteres.
Usted debe hacer al menos una llamada de bloqueo para leer (1)
while True:
if '\n' in buffer:
pass # skip if a line already in buffer
else:
buffer += ser.read(1) # this will block until one more char or timeout
buffer += ser.read(ser.inWaiting()) # get remaining buffered chars
... y hacer lo dividida que antes.
Puede utilizar ser.flushInput()
para eliminar todos los datos en serie que se encuentra actualmente en el búfer.
Después de la limpieza de los datos antiguos, puede ser.readline usuario () para obtener los datos más recientes del dispositivo serie.
Creo que es un poco más simple que las otras soluciones propuestas aquí. Trabajado para mí, espero que sea adecuado para usted.
Este método le permite controlar por separado el tiempo de espera para la recopilación de todos los datos para cada línea, y un tiempo de espera diferente para esperar en líneas adicionales.
# get the last line from serial port
lines = serial_com()
lines[-1]
def serial_com():
'''Serial communications: get a response'''
# open serial port
try:
serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
except serial.SerialException as e:
print("could not open serial port '{}': {}".format(com_port, e))
# read response from serial port
lines = []
while True:
line = serial_port.readline()
lines.append(line.decode('utf-8').rstrip())
# wait for new data after each line
timeout = time.time() + 0.1
while not serial_port.inWaiting() and timeout > time.time():
pass
if not serial_port.inWaiting():
break
#close the serial port
serial_port.close()
return lines
Usted necesitará un bucle para leer todo lo enviado, con la última llamada a readline () hasta que el bloqueo de tiempo de espera. Por lo tanto:
def readLastLine(ser):
last_data=''
while True:
data=ser.readline()
if data!='':
last_data=data
else:
return last_data
Ligera modificación al código mtasic y Vinay de Sajip:
Mientras que encontré este código muy útil para mí para una aplicación similar, que necesitaba todos las líneas que regresan de un dispositivo serie que enviaría la información periódicamente.
he optado por hacer estallar el primer elemento de la parte superior, grabarlo, y luego reunirse con el resto de elementos como el nuevo buffer y continuar desde allí.
Me doy cuenta de que esto es no Greg lo estaba pidiendo, pero pensé que valía la pena compartir como una nota al margen.
def receiving(ser):
global last_received
buffer = ''
while True:
buffer = buffer + ser.read(ser.inWaiting())
if '\n' in buffer:
lines = buffer.split('\n')
last_received = lines.pop(0)
buffer = '\n'.join(lines)
Uso .inWaiting()
dentro de un bucle infinito puede ser problemático. Se puede acaparar la totalidad de la CPU dependiendo de la aplicación. En su lugar, yo recomendaría usar un tamaño específico de datos a leer. Así que en este caso el siguiente debe hacerse, por ejemplo:
ser.read(1024)
El exceso de complicaciones
¿Cuál es la razón para dividir el objeto bytes por salto de línea o por otras manipulaciones de matriz? Escribo el método más simple, que va a resolver su problema:
import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
last = ''
for byte in s.read(s.inWaiting()): last += chr(byte)
if len(last) > 0:
# Do whatever you want with last
print (bytes(last, "utf-8"))
last = ''
Este es un ejemplo utilizando un envoltorio que le permite leer la línea más reciente sin el 100% de la CPU
class ReadLine:
"""
pyserial object wrapper for reading line
source: https://github.com/pyserial/pyserial/issues/216
"""
def __init__(self, s):
self.buf = bytearray()
self.s = s
def readline(self):
i = self.buf.find(b"\n")
if i >= 0:
r = self.buf[:i + 1]
self.buf = self.buf[i + 1:]
return r
while True:
i = max(1, min(2048, self.s.in_waiting))
data = self.s.read(i)
i = data.find(b"\n")
if i >= 0:
r = self.buf + data[:i + 1]
self.buf[0:] = data[i + 1:]
return r
else:
self.buf.extend(data)
s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
print(device.readline())