pyserial - كيفية قراءة السطر الأخير تم ارسالها من الجهاز التسلسلي

StackOverflow https://stackoverflow.com/questions/1093598

سؤال

لدي اردوينو متصلا إلى جهاز الكمبيوتر تشغيل حلقة إرسال قيمة عبر منفذ تسلسلي إلى جهاز الكمبيوتر كل 100 مللي.

أنا تريد أن تجعل من بيثون السيناريو الذي سيتم قراءة من المنفذ التسلسلي فقط كل بضع ثوان ، لذلك أريد أن أرى آخر شيء المرسلة من اردوينو.

كيف يمكنك أن تفعل هذا في Pyserial?

هنا هو رمز حاولت التي لا تعمل.يقرأ الخطوط بالتتابع.

import serial
import time

ser = serial.Serial('com4',9600,timeout=1)
while 1:
    time.sleep(10)
    print ser.readline() #How do I get the most recent line sent from the device?
هل كانت مفيدة؟

المحلول

ربما أسيء فهم سؤالك، ولكن كما هو خط تسلسلي، سيتعين عليك قراءة كل شيء تم إرساله من Arduino بالتتابع - سوف يتم تخفيفه في Arduino حتى تقرأه.

إذا كنت ترغب في الحصول على عرض الحالة الذي يوضح أحدث شيء أرسل - استخدم مؤشر ترابط يشتمل على التعليمات البرمجية في سؤالك (ناقص النوم)، والحفاظ على آخر سطر كامل قراءة كخط آخر من Arduino.

تحديث: mtasicرمز المثال مثال جيد جدا، ولكن إذا أرسل Arduino خطا جزئيا عندما inWaiting() يسمى، ستحصل على خط مقطوع. بدلا من ذلك، ما تريد القيام به هو وضع الأخير اكتمال خط إلى last_received, ، والحفاظ على الخط الجزئي في buffer بحيث يمكن إلحاق ذلك في المرة القادمة حول الحلقة. شيء من هذا القبيل:

def receiving(ser):
    global last_received

    buffer_string = ''
    while True:
        buffer_string = buffer_string + ser.read(ser.inWaiting())
        if '\n' in buffer_string:
            lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
            last_received = lines[-2]
            #If the Arduino sends lots of empty lines, you'll lose the
            #last filled line, so you could make the above statement conditional
            #like so: if lines[-2]: last_received = lines[-2]
            buffer_string = lines[-1]

فيما يتعلق باستخدام readline(): إليك ما يجب أن تقوله وثائق المهارة (تم تحريرها قليلا للوضوح ومع ذكر القوارض ()):

كن حذرا عند استخدام "ReadLine". قم بتحديد مهلة عند فتح المنفذ التسلسلي، وإلا فقد يمكنه حظره إلى الأبد إذا لم يتم استلام حرف Newline. لاحظ أيضا أن "الخضرة ()" يعمل فقط مع مهلة. ذلك يعتمد على وجود مهلة وتفسير أنه مثل EOF (نهاية الملف).

التي تبدو معقولة جدا بالنسبة لي!

نصائح أخرى

from serial import *
from threading import Thread

last_received = ''

def receiving(ser):
    global last_received
    buffer = ''

    while True:
        # last_received = ser.readline()
        buffer += ser.read(ser.inWaiting())
        if '\n' in buffer:
            last_received, buffer = buffer.split('\n')[-2:]

if __name__ ==  '__main__':
    ser = Serial(
        port=None,
        baudrate=9600,
        bytesize=EIGHTBITS,
        parity=PARITY_NONE,
        stopbits=STOPBITS_ONE,
        timeout=0.1,
        xonxoff=0,
        rtscts=0,
        interCharTimeout=None
    )

    Thread(target=receiving, args=(ser,)).start()

هذه الحلول سوف خنزير وحدة المعالجة المركزية أثناء انتظار الشخصيات.

يجب أن تفعل مكالمة حظر واحدة على الأقل لقراءة (1)

while True:
    if '\n' in buffer: 
        pass # skip if a line already in buffer
    else:
        buffer += ser.read(1)  # this will block until one more char or timeout
    buffer += ser.read(ser.inWaiting()) # get remaining buffered chars

... وقم بتقسيم شيء كما كان من قبل.

يمكنك استخدام ser.flushInput() لتخفيف جميع البيانات التسلسلية الموجودة حاليا في المخزن المؤقت.

بعد مسح البيانات القديمة، يمكنك المستخدم Ser.Readline () للحصول على أحدث بيانات من الجهاز التسلسلي.

أعتقد أنها أبسط قليلا من الحلول المقترحة الأخرى هنا. عملت بالنسبة لي، آمل أن تكون مناسبة لك.

تتيح لك هذه الطريقة التحكم بشكل منفصل في مهلة تجمع جميع البيانات لكل سطر، وهناز مهلة مختلفة للانتظار على خطوط إضافية.

# get the last line from serial port
lines = serial_com()
lines[-1]              

def serial_com():
    '''Serial communications: get a response'''

    # open serial port
    try:
        serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
    except serial.SerialException as e:
        print("could not open serial port '{}': {}".format(com_port, e))

    # read response from serial port
    lines = []
    while True:
        line = serial_port.readline()
        lines.append(line.decode('utf-8').rstrip())

        # wait for new data after each line
        timeout = time.time() + 0.1
        while not serial_port.inWaiting() and timeout > time.time():
            pass
        if not serial_port.inWaiting():
            break 

    #close the serial port
    serial_port.close()   
    return lines

ستحتاج إلى حلقة لقراءة كل شيء أرسلت، مع حظر آخر مكالمة ل Readline () حتى المهلة. وبالتالي:

def readLastLine(ser):
    last_data=''
    while True:
        data=ser.readline()
        if data!='':
            last_data=data
        else:
            return last_data

تعديل طفيف على mtasic & فيناي Sajip كود:

في حين وجدت هذه المدونة مفيدة جدا لي طلب مماثل ، أنا في حاجة كل خطوط العودة من المسلسل جهاز إرسال المعلومات بشكل دوري.

أنا اختار أن البوب العنصر الأول قبالة رأس ، سجل ، ومن ثم الانضمام العناصر المتبقية الجديد العازلة وتواصل من هناك.

وأنا أدرك أن هذا هو لا ما جريج كان طالبا ، ولكن أعتقد أنه كان يستحق المشاركة كملاحظة جانبية.

def receiving(ser):
    global last_received

    buffer = ''
    while True:
        buffer = buffer + ser.read(ser.inWaiting())
        if '\n' in buffer:
            lines = buffer.split('\n')
            last_received = lines.pop(0)

            buffer = '\n'.join(lines)

استخدام .inWaiting() داخل حلقة لانهائية قد تكون مشكلة. قد يكون خنزير كامل وحدة المعالجة المركزية اعتمادا على التنفيذ. بدلا من ذلك، أود أن أوصي باستخدام حجم معين من البيانات ليتم قراءته. لذلك في هذه الحالة، يجب القيام بما يلي على سبيل المثال:

ser.read(1024)

الكثير من المضاعفات

ما هو السبب في تقسيم كائن البايتات بواسطة Newline أو عن طريق معالجة صفيف أخرى؟ أكتب أبسط طريقة، والتي ستحل مشكلتك:

import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
    last = ''
    for byte in s.read(s.inWaiting()): last += chr(byte)
    if len(last) > 0:
        # Do whatever you want with last
        print (bytes(last, "utf-8"))
        last = ''

فيما يلي مثال باستخدام غلاف يسمح لك بقراءة الخط الأحدث دون وحدة المعالجة المركزية بنسبة 100٪

class ReadLine:
    """
    pyserial object wrapper for reading line
    source: https://github.com/pyserial/pyserial/issues/216
    """
    def __init__(self, s):
        self.buf = bytearray()
        self.s = s

    def readline(self):
        i = self.buf.find(b"\n")
        if i >= 0:
            r = self.buf[:i + 1]
            self.buf = self.buf[i + 1:]
            return r
        while True:
            i = max(1, min(2048, self.s.in_waiting))
            data = self.s.read(i)
            i = data.find(b"\n")
            if i >= 0:
                r = self.buf + data[:i + 1]
                self.buf[0:] = data[i + 1:]
                return r
            else:
                self.buf.extend(data)

s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
    print(device.readline())
مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top