Pregunta

Tengo un Arduino conectado a mi equipo que ejecuta un bucle, el envío de un valor través del puerto serie de vuelta al ordenador cada 100 ms.

Quiero hacer un script en Python que leerá desde el puerto serie sólo cada pocos segundos, por lo que yo quiero que acaba de ver la última cosa enviada desde el Arduino.

¿Cómo se hace esto en PySerial?

Este es el código Probé el que does't trabajo. Se lee las líneas secuencialmente.

import serial
import time

ser = serial.Serial('com4',9600,timeout=1)
while 1:
    time.sleep(10)
    print ser.readline() #How do I get the most recent line sent from the device?
¿Fue útil?

Solución

Tal vez estoy malentendido su pregunta, pero ya que es una línea serie, que tendrá que leer todo lo enviado desde la secuencialmente Arduino -. Que va a ser amortiguada en el Arduino, hasta que lo lea

Si usted quiere tener una pantalla de estado que muestra lo último enviado -. Utilizar un hilo que incorpora el código en su pregunta (menos el sueño), y tener la última línea completa leer como la última línea de la Arduino

Actualización: código de ejemplo de mtasic es bastante bueno, pero si el Arduino ha enviado una línea parcial cuando inWaiting() se llama, obtendrá una línea truncada. En cambio, lo que quiere hacer es poner el último línea completa en last_received, y mantener la línea parcial en buffer de manera que se puede añadir a la próxima vez que el bucle. Algo como esto:

def receiving(ser):
    global last_received

    buffer_string = ''
    while True:
        buffer_string = buffer_string + ser.read(ser.inWaiting())
        if '\n' in buffer_string:
            lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
            last_received = lines[-2]
            #If the Arduino sends lots of empty lines, you'll lose the
            #last filled line, so you could make the above statement conditional
            #like so: if lines[-2]: last_received = lines[-2]
            buffer_string = lines[-1]

En cuanto al uso de readline(): Esto es lo que la documentación PySerial tiene que decir (ligeramente editado para mayor claridad y con una mención a readlines ()):

  

Tenga cuidado al usar "readline". Hacer   especificar un tiempo de espera cuando se abre la   puerto serie, de lo contrario podría bloquear   siempre si hay carácter de nueva línea es   recibido. También tenga en cuenta que "readlines ()"   sólo funciona con un tiempo de espera. Eso   depende de tener un tiempo de espera y   interpreta que como EOF (final del archivo).

que parece bastante razonable para mí!

Otros consejos

from serial import *
from threading import Thread

last_received = ''

def receiving(ser):
    global last_received
    buffer = ''

    while True:
        # last_received = ser.readline()
        buffer += ser.read(ser.inWaiting())
        if '\n' in buffer:
            last_received, buffer = buffer.split('\n')[-2:]

if __name__ ==  '__main__':
    ser = Serial(
        port=None,
        baudrate=9600,
        bytesize=EIGHTBITS,
        parity=PARITY_NONE,
        stopbits=STOPBITS_ONE,
        timeout=0.1,
        xonxoff=0,
        rtscts=0,
        interCharTimeout=None
    )

    Thread(target=receiving, args=(ser,)).start()

Estas soluciones se aproveche la CPU a la espera de caracteres.

Usted debe hacer al menos una llamada de bloqueo para leer (1)

while True:
    if '\n' in buffer: 
        pass # skip if a line already in buffer
    else:
        buffer += ser.read(1)  # this will block until one more char or timeout
    buffer += ser.read(ser.inWaiting()) # get remaining buffered chars

... y hacer lo dividida que antes.

Puede utilizar ser.flushInput() para eliminar todos los datos en serie que se encuentra actualmente en el búfer.

Después de la limpieza de los datos antiguos, puede ser.readline usuario () para obtener los datos más recientes del dispositivo serie.

Creo que es un poco más simple que las otras soluciones propuestas aquí. Trabajado para mí, espero que sea adecuado para usted.

Este método le permite controlar por separado el tiempo de espera para la recopilación de todos los datos para cada línea, y un tiempo de espera diferente para esperar en líneas adicionales.

# get the last line from serial port
lines = serial_com()
lines[-1]              

def serial_com():
    '''Serial communications: get a response'''

    # open serial port
    try:
        serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
    except serial.SerialException as e:
        print("could not open serial port '{}': {}".format(com_port, e))

    # read response from serial port
    lines = []
    while True:
        line = serial_port.readline()
        lines.append(line.decode('utf-8').rstrip())

        # wait for new data after each line
        timeout = time.time() + 0.1
        while not serial_port.inWaiting() and timeout > time.time():
            pass
        if not serial_port.inWaiting():
            break 

    #close the serial port
    serial_port.close()   
    return lines

Usted necesitará un bucle para leer todo lo enviado, con la última llamada a readline () hasta que el bloqueo de tiempo de espera. Por lo tanto:

def readLastLine(ser):
    last_data=''
    while True:
        data=ser.readline()
        if data!='':
            last_data=data
        else:
            return last_data

Ligera modificación al código mtasic y Vinay de Sajip:

Mientras que encontré este código muy útil para mí para una aplicación similar, que necesitaba todos las líneas que regresan de un dispositivo serie que enviaría la información periódicamente.

he optado por hacer estallar el primer elemento de la parte superior, grabarlo, y luego reunirse con el resto de elementos como el nuevo buffer y continuar desde allí.

Me doy cuenta de que esto es no Greg lo estaba pidiendo, pero pensé que valía la pena compartir como una nota al margen.

def receiving(ser):
    global last_received

    buffer = ''
    while True:
        buffer = buffer + ser.read(ser.inWaiting())
        if '\n' in buffer:
            lines = buffer.split('\n')
            last_received = lines.pop(0)

            buffer = '\n'.join(lines)

El exceso de complicaciones

¿Cuál es la razón para dividir el objeto bytes por salto de línea o por otras manipulaciones de matriz? Escribo el método más simple, que va a resolver su problema:

import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
    last = ''
    for byte in s.read(s.inWaiting()): last += chr(byte)
    if len(last) > 0:
        # Do whatever you want with last
        print (bytes(last, "utf-8"))
        last = ''

Este es un ejemplo utilizando un envoltorio que le permite leer la línea más reciente sin el 100% de la CPU

class ReadLine:
    """
    pyserial object wrapper for reading line
    source: https://github.com/pyserial/pyserial/issues/216
    """
    def __init__(self, s):
        self.buf = bytearray()
        self.s = s

    def readline(self):
        i = self.buf.find(b"\n")
        if i >= 0:
            r = self.buf[:i + 1]
            self.buf = self.buf[i + 1:]
            return r
        while True:
            i = max(1, min(2048, self.s.in_waiting))
            data = self.s.read(i)
            i = data.find(b"\n")
            if i >= 0:
                r = self.buf + data[:i + 1]
                self.buf[0:] = data[i + 1:]
                return r
            else:
                self.buf.extend(data)

s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
    print(device.readline())
scroll top