pyserial - Como ler a última linha enviada a partir de um dispositivo serial
-
11-09-2019 - |
Pergunta
Eu tenho um Arduino conectado ao meu computador que executa um loop, enviando um valor sobre o encosto de porta serial para o computador a cada 100 ms.
Eu quero fazer um script Python que irá ler a partir da porta serial apenas a cada poucos segundos, então eu quero que ele só vê a última coisa enviada do Arduino.
Como você faz isso em pyserial?
Aqui está o código que eu tentei que o trabalho does't. Ele lê as linhas sequencialmente.
import serial
import time
ser = serial.Serial('com4',9600,timeout=1)
while 1:
time.sleep(10)
print ser.readline() #How do I get the most recent line sent from the device?
Solução
Talvez eu esteja mal-entendido a sua pergunta, mas como é uma linha de série, você vai ter que ler tudo o enviado do Arduino sequencialmente -. Ele vai ser tamponado até no Arduino até que você lê-lo
Se você quer ter uma exibição de estado que mostra a última coisa enviado - usar um fio que incorpora o código na sua pergunta (menos o sono), e manter a última linha completa lido como a última linha do Arduino <. / p>
Update: código de exemplo de mtasic
é muito bom, mas se o Arduino enviou uma linha parcial quando inWaiting()
é chamado, você vai ter uma linha truncado. Em vez disso, o que você quer fazer é colocar a última completo linha para last_received
, e manter a linha parcial em buffer
para que ele possa ser anexado à próxima vez em volta do loop. Algo parecido com isto:
def receiving(ser):
global last_received
buffer_string = ''
while True:
buffer_string = buffer_string + ser.read(ser.inWaiting())
if '\n' in buffer_string:
lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
last_received = lines[-2]
#If the Arduino sends lots of empty lines, you'll lose the
#last filled line, so you could make the above statement conditional
#like so: if lines[-2]: last_received = lines[-2]
buffer_string = lines[-1]
Quanto ao uso de readline()
: Aqui está o que a documentação pyserial tem a dizer (ligeiramente editado para maior clareza e com uma menção ao readlines ()):
Tenha cuidado ao usar "readline". Faz especificar um tempo limite ao abrir o porta serial, caso contrário, poderia bloquear sempre se nenhum caractere de nova linha é recebido. Note também que "readlines ()" só funciona com um tempo limite. isto depende de ter um tempo de espera e interpreta isso como EOF (fim de arquivo).
que parece bastante razoável para mim!
Outras dicas
from serial import *
from threading import Thread
last_received = ''
def receiving(ser):
global last_received
buffer = ''
while True:
# last_received = ser.readline()
buffer += ser.read(ser.inWaiting())
if '\n' in buffer:
last_received, buffer = buffer.split('\n')[-2:]
if __name__ == '__main__':
ser = Serial(
port=None,
baudrate=9600,
bytesize=EIGHTBITS,
parity=PARITY_NONE,
stopbits=STOPBITS_ONE,
timeout=0.1,
xonxoff=0,
rtscts=0,
interCharTimeout=None
)
Thread(target=receiving, args=(ser,)).start()
Essas soluções irão monopolizar a CPU enquanto aguarda caracteres.
Você deve fazer pelo menos uma chamada de bloqueio para ler (1)
while True:
if '\n' in buffer:
pass # skip if a line already in buffer
else:
buffer += ser.read(1) # this will block until one more char or timeout
buffer += ser.read(ser.inWaiting()) # get remaining buffered chars
... e fazer a coisa dividida como antes.
Você pode usar ser.flushInput()
para expulsar todos os dados de série que é atualmente no buffer.
Depois de limpar os dados antigos, você pode ser.readline utilizador () para obter os dados mais recentes do dispositivo serial.
Eu acho que é um pouco mais simples do que as outras soluções propostas aqui. Trabalhou para mim, espero que seja adequado para você.
Este método permite controlar separadamente o tempo limite para a recolha de todos os dados para cada linha, e um tempo limite diferente para esperando em linhas adicionais.
# get the last line from serial port
lines = serial_com()
lines[-1]
def serial_com():
'''Serial communications: get a response'''
# open serial port
try:
serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
except serial.SerialException as e:
print("could not open serial port '{}': {}".format(com_port, e))
# read response from serial port
lines = []
while True:
line = serial_port.readline()
lines.append(line.decode('utf-8').rstrip())
# wait for new data after each line
timeout = time.time() + 0.1
while not serial_port.inWaiting() and timeout > time.time():
pass
if not serial_port.inWaiting():
break
#close the serial port
serial_port.close()
return lines
Você vai precisar de um loop para ler tudo enviado, com a última chamada para readline () o bloqueio até que o tempo limite. Assim:
def readLastLine(ser):
last_data=''
while True:
data=ser.readline()
if data!='':
last_data=data
else:
return last_data
Ligeira modificação ao código mtasic & de Vinay Sajip:
Enquanto eu encontrei este código bastante útil para mim por um pedido semelhante, eu precisava de todas as linhas voltando de um dispositivo de série que iria enviar informações periodicamente.
I optou por colocar o primeiro elemento do topo, gravá-lo, e depois voltar a elementos restantes como o novo tampão e continuar a partir daí.
Eu percebo que este é não que Greg estava pedindo, mas achei que valia a pena compartilhar como uma nota lateral.
def receiving(ser):
global last_received
buffer = ''
while True:
buffer = buffer + ser.read(ser.inWaiting())
if '\n' in buffer:
lines = buffer.split('\n')
last_received = lines.pop(0)
buffer = '\n'.join(lines)
Usando .inWaiting()
dentro de um loop infinito pode ser problemático. Pode monopolizar a toda CPU dependendo da implementação. Em vez disso, eu recomendo usar um tamanho específico de dados a ser lido. Portanto, neste caso o seguinte deve ser feito, por exemplo:
ser.read(1024)
Demasiado complicações
Qual é a razão para dividir o objeto bytes por nova linha ou por outras manipulações de matriz? Eu escrevo o método mais simples, que vai resolver o seu problema:
import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
last = ''
for byte in s.read(s.inWaiting()): last += chr(byte)
if len(last) > 0:
# Do whatever you want with last
print (bytes(last, "utf-8"))
last = ''
Aqui está um exemplo usando uma interface que permite que você leia a linha mais recente sem 100% de CPU
class ReadLine:
"""
pyserial object wrapper for reading line
source: https://github.com/pyserial/pyserial/issues/216
"""
def __init__(self, s):
self.buf = bytearray()
self.s = s
def readline(self):
i = self.buf.find(b"\n")
if i >= 0:
r = self.buf[:i + 1]
self.buf = self.buf[i + 1:]
return r
while True:
i = max(1, min(2048, self.s.in_waiting))
data = self.s.read(i)
i = data.find(b"\n")
if i >= 0:
r = self.buf + data[:i + 1]
self.buf[0:] = data[i + 1:]
return r
else:
self.buf.extend(data)
s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
print(device.readline())