Frage

Hallo Ich arbeite einen RPC-Server auf der Entwicklung auf Basis von Twisted mehrere Mikrocontroller zu dienen, den RPC-Aufruf an Twisted JSON-RPC-Server zu machen. Aber die Anwendung auch erforderlich, dass jederzeit zu jedem Mikroinformationsserver senden, so ist die Frage, wie eine gute Praxis sein könnte, um zu verhindern, dass die Antwort von einem Remote-JSON-RPC Anruf von einem Mikro mit einer Server JSON-RPC Anfrage verwechselt wird, die für gemacht wird Ein Benutzer.

Die Folge, dass ich jetzt habe, ist, dass Micros schlechte Informationen erhalten, weil sie nicht wissen, ob netstring / json Zeichenfolge, die von der Buchse comming ist ihre Antwort aus einer früheren Anforderung ist oder eine neue Anforderung vom Server.

Hier ist mein Code:

from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref

creds  = {'user1':'pass1','user2':'pass2','user3':'pass3'}

class arduinoRPC(jsonrpc.JSONRPC):
    def connectionMade(self):
        pass

    def jsonrpc_identify(self,username,password,mac):
        """ Each client must be authenticated just after to be connected calling this rpc """
        if creds.has_key(username):
            if creds[username] == password:
                authenticated = True
            else:
                authenticated = False
        else:
            authenticated = False

        if authenticated:
            self.factory.clients.append(self)
            self.factory.references[mac] = weakref.ref(self)
            return {'results':'Authenticated as %s'%username,'error':None}
        else:
            self.transport.loseConnection()

    def jsonrpc_sync_acq(self,data,f):
        """Save into django table data acquired from sensors and send ack to gateway"""
        if not (self in self.factory.clients):
            self.transport.loseConnection()
        print f
        return {'results':'synced %s records'%len(data),'error':'null'}

    def connectionLost(self, reason):
        """ mac address is searched and all reference to self.factory.clientes are erased """  
        for mac in self.factory.references.keys():
            if self.factory.references[mac]() == self:
                print 'Connection closed - Mac address: %s'%mac
                del self.factory.references[mac]
                self.factory.clients.remove(self)


class rpcfactory(jsonrpc.RPCFactory):
    protocol = arduinoRPC
    def __init__(self, maxLength=1024):
        self.maxLength = maxLength
        self.subHandlers = {}
        self.clients    =   []
        self.references =   {}

""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
    def __init__(self,rpcfactory):
        threading.Thread.__init__(self)
        self.rpcfactory =   rpcfactory
        """identifiers of each micro/client connected"""
        self.remoteMacList    =   ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
    def run(self):
        while True:
            time.sleep(10)
            while True:
                """ call to any of three potential micros connected """ 
                mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
                if self.rpcfactory.references.has_key(mac):
                    print 'Calling %s'%mac
                    proto   =   self.rpcfactory.references[mac]()
                    """ requesting echo from selected micro"""
                    dataToSend  = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
                    proto.transport.write(dataToSend)
                    break

factory = rpcfactory(arduinoRPC)

"""start thread caller""" 
r=asyncGatewayCalls(factory)
r.start()

reactor.listenTCP(7080, factory)
print "Micros remote RPC server started"
reactor.run()
War es hilfreich?

Lösung

Sie müssen eine genügend Informationen, um jede Nachricht hinzuzufügen, so dass der Empfänger kann bestimmen, wie sie zu interpretieren. Ihre Anforderungen klingt sehr ähnlich denen von AMP , so könnten Sie entweder verwenden AMP statt oder die gleiche Struktur wie AMP verwenden, um Ihre Nachrichten zu identifizieren. Im Einzelnen:

  • In Anfragen, setzen Sie einen bestimmten Schlüssel - zum Beispiel AMP verwendet „_ask“ Anfragen zu identifizieren. Es gibt auch diese einen eindeutigen Wert, ferner identifiziert, dass Anforderung für die gesamte Lebensdauer der Verbindung.
  • In Antworten, setzen Sie einen anderen Schlüssel - zum Beispiel AMP verwendet „_answer“ für diese. Der Wert entspricht dem Wert aus dem „_ask“ -Taste in der Anfrage auf die Antwort für.

einen Ansatz wie dies verwendet, man muss nur schauen, um zu sehen, ob es ein „_ask“ -Taste oder „_answer“ -Taste, um zu bestimmen, ob Sie eine neue Anforderung oder eine Antwort auf eine frühere Anfrage erhalten.

Auf einem separaten Thema, Ihre asyncGatewayCalls Klasse sollte nicht Thread-basiert sein. Es gibt keinen ersichtlichen Grund dafür Threads zu verwenden, und indem so ist es auch in einer Art und Weise Twisted-APIs mißbraucht, die zu undefiniertem Verhalten führen. Die meisten Twisted-APIs kann nur in dem Thread verwendet werden, in dem Sie reactor.run genannt. Die einzige Ausnahme ist reactor.callFromThread, mit dem Sie eine Nachricht an den Reaktor Thread senden von einem anderen Thread verwenden können. asyncGatewayCalls versucht zu schreiben, um einen Transport, obwohl, die schlimmere Dinge geschickt, oder vielleicht ist Korruption oder willkürliche Verzögerungen in den Daten zu puffern führen. Stattdessen können Sie asyncGatewayCalls wie folgt schreiben:

from twisted.internet.task import LoopingCall

class asyncGatewayCalls(object):
    def __init__(self, rpcfactory):
        self.rpcfactory = rpcfactory
        self.remoteMacList = [...]

    def run():
        self._call = LoopingCall(self._pokeMicro)
        return self._call.start(10)

    def _pokeMicro(self):
        while True:
            mac = self.remoteMacList[...]
            if mac in self.rpcfactory.references:
                proto = ...
                dataToSend = ...
                proto.transport.write(dataToSend)
                break

factory = ...
r = asyncGatewayCalls(factory)
r.run()

reactor.listenTCP(7080, factory)
reactor.run()

Das gibt Ihnen eine Single-Threaded-Lösung, die das gleiche Verhalten haben sollte, wie Sie für die ursprüngliche asyncGatewayCalls Klasse bestimmt. Statt in einem Thread, um in einer Schleife schlafen, die Anrufe zu planen, obwohl es den Scheduling-APIs des Reaktors verwendet (über die übergeordneten LoopingCall-Klasse, die Zeitplan Dinge immer wieder aufgerufen werden) sicher _pokeMicro machen wird alle zehn Sekunden aufgerufen .

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top