Come implementare un modo due JSONRPC + server di twisted / client
-
10-10-2019 - |
Domanda
Ciao Sto lavorando su sviluppare un server RPC in base a contorto per servire diversi microcontrollori che rendono chiamata RPC al server JSONRPC contorto. Ma l'applicazione richiede inoltre che le informazioni sul server di invio a ciascun micro in qualsiasi momento, quindi la domanda è come potrebbe essere una buona pratica per impedire che la risposta da una chiamata JSONRPC remoto da un micro essere confuso con una richiesta del server JSONRPC che è fatto per un utente.
La conseguenza che sto avendo ora è che micro stanno ricevendo cattiva informazione, perché non so se stringa netstring / JSON che sta venendo da presa è la loro risposta ad una disposizione precedente o è una nuova richiesta dal server.
Ecco il mio codice:
from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref
creds = {'user1':'pass1','user2':'pass2','user3':'pass3'}
class arduinoRPC(jsonrpc.JSONRPC):
def connectionMade(self):
pass
def jsonrpc_identify(self,username,password,mac):
""" Each client must be authenticated just after to be connected calling this rpc """
if creds.has_key(username):
if creds[username] == password:
authenticated = True
else:
authenticated = False
else:
authenticated = False
if authenticated:
self.factory.clients.append(self)
self.factory.references[mac] = weakref.ref(self)
return {'results':'Authenticated as %s'%username,'error':None}
else:
self.transport.loseConnection()
def jsonrpc_sync_acq(self,data,f):
"""Save into django table data acquired from sensors and send ack to gateway"""
if not (self in self.factory.clients):
self.transport.loseConnection()
print f
return {'results':'synced %s records'%len(data),'error':'null'}
def connectionLost(self, reason):
""" mac address is searched and all reference to self.factory.clientes are erased """
for mac in self.factory.references.keys():
if self.factory.references[mac]() == self:
print 'Connection closed - Mac address: %s'%mac
del self.factory.references[mac]
self.factory.clients.remove(self)
class rpcfactory(jsonrpc.RPCFactory):
protocol = arduinoRPC
def __init__(self, maxLength=1024):
self.maxLength = maxLength
self.subHandlers = {}
self.clients = []
self.references = {}
""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
def __init__(self,rpcfactory):
threading.Thread.__init__(self)
self.rpcfactory = rpcfactory
"""identifiers of each micro/client connected"""
self.remoteMacList = ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
def run(self):
while True:
time.sleep(10)
while True:
""" call to any of three potential micros connected """
mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
if self.rpcfactory.references.has_key(mac):
print 'Calling %s'%mac
proto = self.rpcfactory.references[mac]()
""" requesting echo from selected micro"""
dataToSend = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
proto.transport.write(dataToSend)
break
factory = rpcfactory(arduinoRPC)
"""start thread caller"""
r=asyncGatewayCalls(factory)
r.start()
reactor.listenTCP(7080, factory)
print "Micros remote RPC server started"
reactor.run()
Soluzione
È necessario aggiungere un abbastanza informazioni per ogni messaggio in modo che il destinatario possa determinare come interpretarla. Le vostre esigenze suoni molto simili a quelle di AMP , così si potrebbe usare sia AMP invece o utilizzare la stessa struttura di AMP per identificare i messaggi. In particolare:
- Nella richieste, ha messo una chiave particolare - per esempio, AMP utilizza "_ask" per identificare le richieste. Si dà anche questi un valore unico, che ha ulteriormente identifica tale richiesta per tutta la durata della connessione.
- In risposta, ha messo una chiave diversa - ad esempio, AMP utilizza "_answer" per questo. Il valore corrisponde con il valore della chiave "_ask" nella richiesta la risposta è per.
L'utilizzo di un approccio di questo tipo, basta guardare per vedere se v'è una chiave "_ask" o un tasto "_answer" per determinare se hai ricevuto una nuova richiesta o una risposta ad una richiesta precedente.
Su un argomento a parte, la classe asyncGatewayCalls
non dovrebbe essere filo-based. Non c'è nessuna ragione apparente per poter utilizzare i thread, e così facendo è anche abusando API intrecciati in un modo che porterà a un comportamento indefinito. La maggior parte ritorto API può essere utilizzato solo nel thread in cui hai chiamato reactor.run
. L'unica eccezione è reactor.callFromThread
, che è possibile utilizzare per inviare un messaggio al thread reattore da qualsiasi altro thread. asyncGatewayCalls
tenta di scrivere ad un trasporto, però, che porterà a tamponare la corruzione o ritardi arbitrari nei dati inviati, o forse le cose peggiori. Invece, è possibile scrivere asyncGatewayCalls
in questo modo:
from twisted.internet.task import LoopingCall
class asyncGatewayCalls(object):
def __init__(self, rpcfactory):
self.rpcfactory = rpcfactory
self.remoteMacList = [...]
def run():
self._call = LoopingCall(self._pokeMicro)
return self._call.start(10)
def _pokeMicro(self):
while True:
mac = self.remoteMacList[...]
if mac in self.rpcfactory.references:
proto = ...
dataToSend = ...
proto.transport.write(dataToSend)
break
factory = ...
r = asyncGatewayCalls(factory)
r.run()
reactor.listenTCP(7080, factory)
reactor.run()
Questo ti dà una soluzione single-threaded che dovrebbe avere lo stesso comportamento come desiderato per la classe asyncGatewayCalls
originale. Invece di dormire in un ciclo in un thread per programmare le chiamate, però, utilizza le API di pianificazione del reattore (tramite la classe di livello superiore LoopingCall, che orari attrattive essere richiamata ripetutamente) per assicurarsi _pokeMicro
viene chiamato ogni secondi dieci .