Pregunta

Hola estoy trabajando en el desarrollo de un servidor basado en RPC trenzado para servir a varios microcontroladores que hacen llamada RPC al servidor JSONRPC trenzado. Pero la aplicación también requiere que la información de envío del servidor para cada micro en cualquier momento, por lo que la pregunta es ¿cómo podría ser una buena práctica para evitar que la respuesta de una llamada JSONRPC remoto desde un micro ser confundida con una solicitud JSONRPC servidor que está hecho para un usuario.

La consecuencia que estoy teniendo ahora es que los micros están recibiendo información errónea, porque no saben si la cadena netstring / JSON que está viniendo de la toma es su respuesta a un requisito previo o es una nueva petición de servidor.

Aquí está mi código:

from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref

creds  = {'user1':'pass1','user2':'pass2','user3':'pass3'}

class arduinoRPC(jsonrpc.JSONRPC):
    def connectionMade(self):
        pass

    def jsonrpc_identify(self,username,password,mac):
        """ Each client must be authenticated just after to be connected calling this rpc """
        if creds.has_key(username):
            if creds[username] == password:
                authenticated = True
            else:
                authenticated = False
        else:
            authenticated = False

        if authenticated:
            self.factory.clients.append(self)
            self.factory.references[mac] = weakref.ref(self)
            return {'results':'Authenticated as %s'%username,'error':None}
        else:
            self.transport.loseConnection()

    def jsonrpc_sync_acq(self,data,f):
        """Save into django table data acquired from sensors and send ack to gateway"""
        if not (self in self.factory.clients):
            self.transport.loseConnection()
        print f
        return {'results':'synced %s records'%len(data),'error':'null'}

    def connectionLost(self, reason):
        """ mac address is searched and all reference to self.factory.clientes are erased """  
        for mac in self.factory.references.keys():
            if self.factory.references[mac]() == self:
                print 'Connection closed - Mac address: %s'%mac
                del self.factory.references[mac]
                self.factory.clients.remove(self)


class rpcfactory(jsonrpc.RPCFactory):
    protocol = arduinoRPC
    def __init__(self, maxLength=1024):
        self.maxLength = maxLength
        self.subHandlers = {}
        self.clients    =   []
        self.references =   {}

""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
    def __init__(self,rpcfactory):
        threading.Thread.__init__(self)
        self.rpcfactory =   rpcfactory
        """identifiers of each micro/client connected"""
        self.remoteMacList    =   ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
    def run(self):
        while True:
            time.sleep(10)
            while True:
                """ call to any of three potential micros connected """ 
                mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
                if self.rpcfactory.references.has_key(mac):
                    print 'Calling %s'%mac
                    proto   =   self.rpcfactory.references[mac]()
                    """ requesting echo from selected micro"""
                    dataToSend  = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
                    proto.transport.write(dataToSend)
                    break

factory = rpcfactory(arduinoRPC)

"""start thread caller""" 
r=asyncGatewayCalls(factory)
r.start()

reactor.listenTCP(7080, factory)
print "Micros remote RPC server started"
reactor.run()
¿Fue útil?

Solución

Es necesario añadir una suficiente información para cada mensaje para que el destinatario puede determinar cómo interpretarlo. Sus requisitos de sonidos muy similares a las de AMP , por lo que podría o bien utilizar AMP lugar o utilizar la misma estructura que la AMP para identificar sus mensajes. Específicamente:

  • En las solicitudes, puso una clave particular - por ejemplo, AMP utiliza "_ask" para identificar las solicitudes. También da a éstos un valor único, que además identifica dicha solicitud de la vida útil de la conexión.
  • En las respuestas, puso una clave diferente - por ejemplo, AMP utiliza "_answer" para esto. El valor coincide con el valor de la clave "_ask" en la petición de la respuesta es para.

El uso de un enfoque de este tipo, sólo hay que mirar para ver si hay una tecla "_ask" o una tecla de "_answer" para determinar si ha recibido una nueva solicitud o una respuesta a una petición anterior.

En un tema aparte, la clase asyncGatewayCalls no debe ser a base de hilo. No hay razón aparente para que utilice hilos, y al hacerlo, también está haciendo mal uso API retorcida de tal forma que dará lugar a un comportamiento indefinido. La mayoría de Twisted API sólo se puede utilizar en el hilo en el que se ha llamado reactor.run. La única excepción es reactor.callFromThread, que se puede utilizar para enviar un mensaje desde cualquier otro hilo al hilo del reactor. asyncGatewayCalls intenta escribir en un transporte, sin embargo, lo que conducirá a amortiguar la corrupción o retrasos arbitrarios en los datos que se envían, o tal vez cosas peores. En su lugar, puede escribir asyncGatewayCalls como esto:

from twisted.internet.task import LoopingCall

class asyncGatewayCalls(object):
    def __init__(self, rpcfactory):
        self.rpcfactory = rpcfactory
        self.remoteMacList = [...]

    def run():
        self._call = LoopingCall(self._pokeMicro)
        return self._call.start(10)

    def _pokeMicro(self):
        while True:
            mac = self.remoteMacList[...]
            if mac in self.rpcfactory.references:
                proto = ...
                dataToSend = ...
                proto.transport.write(dataToSend)
                break

factory = ...
r = asyncGatewayCalls(factory)
r.run()

reactor.listenTCP(7080, factory)
reactor.run()

Esto le da una solución de un solo subproceso que debe tener el mismo comportamiento que se pretende para la clase asyncGatewayCalls originales. En vez de dormir en un bucle en un hilo, para marcar las llamadas, sin embargo, que utiliza las API de programación del reactor (a través de la clase de nivel superior LoopingCall, que programa las cosas que se llama repetidamente) para asegurarse de _pokeMicro se vuelve a llamar cada segundo diez .

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top