Question

I have several IO operation that I carry out on class init but they often fail with IOError. What I would like to do is delay a few hundred ms and try again until success or some defined timeout. How can I make sure each individual command succeeds before continuing/ending the loop? I assume there is a better way than an if statement for each item and a counter to check if all commands succeeded.

My current code below often fails with IOError and hangs the rest of the application.

   def __init__(self):
      print("Pressure init.")
      self.readCoefficients()

   def readCoefficients(self):
      global a0_MSB;
      global a0_LSB;
      global b1_MSB;
      global b1_LSB;
      global b2_MSB;
      global b2_LSB;
      global c12_MSB;
      global c12_LSB;

      a0_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0);
      a0_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_A0_COEFF_LSB+0);

      b1_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B1_COEFF_MSB+0);
      b1_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B1_COEFF_LSB+0);

      b2_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B2_COEFF_MSB+0);
      b2_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_B2_COEFF_LSB+0);

      c12_MSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_C12_COEFF_MSB+0);
      c12_LSB = Pressure.bus.read_byte_data(Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_C12_COEFF_LSB+0);
Was it helpful?

Solution 2

If it's OK for you that all the files are read one after the other, you can use a simple function.

import time

# ...

def readCoefficients(self):
    global a0_MSB;
    global a0_LSB;
    global b1_MSB;
    global b1_LSB;
    global b2_MSB;
    global b2_LSB;
    global c12_MSB;
    global c12_LSB;

    max_retries = 15

    a0_MSB = self.readretry(Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0, max_retries)
    a0_LSB = self.readretry(Pressure.MPL115A2_REGISTER_A0_COEFF_LSB+0, max_retries)

    b1_MSB = self.readretry(Pressure.MPL115A2_REGISTER_B1_COEFF_MSB+0, max_retries)
    b1_LSB = self.readretry(Pressure.MPL115A2_REGISTER_B1_COEFF_LSB+0, max_retries)

    b2_MSB = self.readretry(Pressure.MPL115A2_REGISTER_B2_COEFF_MSB+0, max_retries)
    b2_LSB = self.readretry(Pressure.MPL115A2_REGISTER_B2_COEFF_LSB+0, max_retries)

    c12_MSB = self.readretry(Pressure.MPL115A2_REGISTER_C12_COEFF_MSB+0, max_retries)
    c12_LSB = self.readretry(Pressure.MPL115A2_REGISTER_C12_COEFF_LSB+0, max_retries)

    def readretry(self, address, max_retries):
        for i in range(max_retries):
            try:
                return Pressure.bus.read_byte_data(
                    Pressure.MPL115A2_ADDRESS,
                    address
                )
            except IOError as e:
                # print(e)
                time.sleep(0.1)
        else:
            raise IOError("Reading failed after multiple tries")

Note: You should not use globals, most specially in classes.

OTHER TIPS

Are you wanting to retry each one of those last 8 lines independently or as a group? If independently you will want to make a little helper function:

def retry_function(tries, function, *args, **kwargs):
    for try in range(tries):
        try:
            return function(*args, **kwargs)
        except IOError as e:
            time.sleep(.005)
    raise e   # will be the last error from inside the loop. be sure tries is at least 1 or this will be undefined!

Then call it like this:

a0_MSB = retry_function(5, Pressure.bus.read_byte_data, Pressure.MPL115A2_ADDRESS,Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0)

If not independently but as a group, you probably still want this helper function. But you'll have to rewrite it to handle a list of functions/arguments, or pass in another custom function

This is another way of doing it. this code tries to read all addresses, and saves the one that failed. Then waits a little and retries all the addresses that failed until all addresses have been read properly or the number of allowed retries exceeded.

def readCoefficients(self):
    (
        a0_MSB, a0_LSB,
        b1_MSB, b1_LSB,
        b2_MSB, b2_LSB,
        c12_MSB, c12_LSB) = self.mio_read(15,
            Pressure.MPL115A2_REGISTER_A0_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_A0_COEFF_LSB+0,
            Pressure.MPL115A2_REGISTER_B1_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_B1_COEFF_LSB+0,
            Pressure.MPL115A2_REGISTER_B2_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_B2_COEFF_LSB+0,
            Pressure.MPL115A2_REGISTER_C12_COEFF_MSB+0,
            Pressure.MPL115A2_REGISTER_C12_COEFF_LSB+0
    )

def mio_read(self, max_retries, *addresses):
    # Create storage for results
    results = [None] * len(addresses)
    # Keep track of the index of a particular address in the list of results
    ios = list(enumerate(addresses))
    for i in range(max_retries):
        failedios = []
        for index, address in ios:
            try:
                results[index] = Pressure.bus.read_byte_data(
                    Pressure.MPL115A2_ADDRESS,
                    address
                )
            except IOError as e:
                # Place address in the queue for the next round
                failedios.append((index, address))
        # If all succeeded
        if len(failedios) == 0:
            return results
        # Time may be reduced as so was spent checking other addresses
        time.sleep(0.1)
        ios = failedios
    else:
        raise IOError(",".join((addr for ind, addr in failedios)))
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top