双方向の jsonrpc + ツイスト サーバー/クライアントを実装する方法
-
10-10-2019 - |
質問
こんにちは、私は、Twisted jsonrpc サーバーへの RPC 呼び出しを行ういくつかのマイクロコントローラーにサービスを提供するために、Twisted に基づいた RPC サーバーの開発に取り組んでいます。しかし、このアプリケーションでは、サーバーがいつでも各マイクロに情報を送信する必要があるため、マイクロからのリモート jsonrpc 呼び出しからの応答が、サーバーに対して行われた jsonrpc リクエストと混同されないようにするにはどうすればよいかが問題になります。ユーザー。
私が今抱えている結果は、ソケットから来るネット文字列/json文字列が以前の要件からの応答であるか、サーバーからの新しい要求であるかわからないため、マイクロが間違った情報を受信していることです。
これが私のコードです:
from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref
creds = {'user1':'pass1','user2':'pass2','user3':'pass3'}
class arduinoRPC(jsonrpc.JSONRPC):
def connectionMade(self):
pass
def jsonrpc_identify(self,username,password,mac):
""" Each client must be authenticated just after to be connected calling this rpc """
if creds.has_key(username):
if creds[username] == password:
authenticated = True
else:
authenticated = False
else:
authenticated = False
if authenticated:
self.factory.clients.append(self)
self.factory.references[mac] = weakref.ref(self)
return {'results':'Authenticated as %s'%username,'error':None}
else:
self.transport.loseConnection()
def jsonrpc_sync_acq(self,data,f):
"""Save into django table data acquired from sensors and send ack to gateway"""
if not (self in self.factory.clients):
self.transport.loseConnection()
print f
return {'results':'synced %s records'%len(data),'error':'null'}
def connectionLost(self, reason):
""" mac address is searched and all reference to self.factory.clientes are erased """
for mac in self.factory.references.keys():
if self.factory.references[mac]() == self:
print 'Connection closed - Mac address: %s'%mac
del self.factory.references[mac]
self.factory.clients.remove(self)
class rpcfactory(jsonrpc.RPCFactory):
protocol = arduinoRPC
def __init__(self, maxLength=1024):
self.maxLength = maxLength
self.subHandlers = {}
self.clients = []
self.references = {}
""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
def __init__(self,rpcfactory):
threading.Thread.__init__(self)
self.rpcfactory = rpcfactory
"""identifiers of each micro/client connected"""
self.remoteMacList = ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
def run(self):
while True:
time.sleep(10)
while True:
""" call to any of three potential micros connected """
mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
if self.rpcfactory.references.has_key(mac):
print 'Calling %s'%mac
proto = self.rpcfactory.references[mac]()
""" requesting echo from selected micro"""
dataToSend = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
proto.transport.write(dataToSend)
break
factory = rpcfactory(arduinoRPC)
"""start thread caller"""
r=asyncGatewayCalls(factory)
r.start()
reactor.listenTCP(7080, factory)
print "Micros remote RPC server started"
reactor.run()
解決
受信者がメッセージをどのように解釈できるかを判断できるように、各メッセージに十分な情報を追加する必要があります。あなたの要件は次の要件と非常によく似ています アンプ, したがって、代わりに AMP を使用するか、AMP と同じ構造を使用してメッセージを識別することができます。具体的には:
- リクエストには特定のキーを入力します。たとえば、AMP は「_ask」を使用してリクエストを識別します。また、これらに一意の値を与え、接続の存続期間中そのリクエストをさらに識別します。
- 応答には別のキーを入力します。たとえば、AMP はこれに「_answer」を使用します。この値は、応答の対象となるリクエストの「_ask」キーの値と一致します。
このようなアプローチを使用すると、「_ask」キーまたは「_answer」キーがあるかどうかを確認するだけで、新しいリクエストを受信したか、前のリクエストに対する応答を受信したかを判断できます。
別のトピックでは、あなたの asyncGatewayCalls
クラスはスレッドベースであってはなりません。スレッドを使用する明確な理由はなく、スレッドを使用することにより、未定義の動作を引き起こす方法で Twisted API を悪用することになります。ほとんどの Twisted API は、呼び出したスレッドでのみ使用できます。 reactor.run
. 。唯一の例外は、 reactor.callFromThread
, これを使用して、他のスレッドからリアクター スレッドにメッセージを送信できます。 asyncGatewayCalls
ただし、トランスポートに書き込もうとすると、バッファの破損や送信データの恣意的な遅延、あるいはさらに悪いことが発生する可能性があります。代わりに、次のように書くこともできます asyncGatewayCalls
このような:
from twisted.internet.task import LoopingCall
class asyncGatewayCalls(object):
def __init__(self, rpcfactory):
self.rpcfactory = rpcfactory
self.remoteMacList = [...]
def run():
self._call = LoopingCall(self._pokeMicro)
return self._call.start(10)
def _pokeMicro(self):
while True:
mac = self.remoteMacList[...]
if mac in self.rpcfactory.references:
proto = ...
dataToSend = ...
proto.transport.write(dataToSend)
break
factory = ...
r = asyncGatewayCalls(factory)
r.run()
reactor.listenTCP(7080, factory)
reactor.run()
これにより、元のソリューションで意図したのと同じ動作をするシングルスレッドのソリューションが得られます。 asyncGatewayCalls
クラス。ただし、呼び出しをスケジュールするためにスレッド内のループでスリープするのではなく、リアクターのスケジューリング API (繰り返し呼び出される内容をスケジュールする高レベルの LoopingCall クラス経由) を使用して、 _pokeMicro
10秒ごとに呼び出されます。