Вопрос

Здравствуйте, я работаю над разработкой RPC -сервера на основе Twisted, чтобы обслуживать несколько микроконтроллеров, которые делают вызов RPC на Twisted JSONRPC Server. Но приложение также требовало, чтобы сервер отправлял информацию в каждый микрофон в любое время, поэтому вопрос заключается в том, как может быть хорошей практикой, чтобы предотвратить этот ответ от удаленного вызова JSONRPC из микро -микросхемы с запросом на сервер JSONRPC, который создан для Пользователь.

Следствием того, что у меня сейчас есть то, что микросхема получает плохую информацию, потому что они не знают, является ли строка Netstring/Json, которая выходит из сокета, является их ответом от предыдущего требования или является новым запросом с сервера.

Вот мой код:

from twisted.internet import reactor
from txjsonrpc.netstring import jsonrpc
import weakref

creds  = {'user1':'pass1','user2':'pass2','user3':'pass3'}

class arduinoRPC(jsonrpc.JSONRPC):
    def connectionMade(self):
        pass

    def jsonrpc_identify(self,username,password,mac):
        """ Each client must be authenticated just after to be connected calling this rpc """
        if creds.has_key(username):
            if creds[username] == password:
                authenticated = True
            else:
                authenticated = False
        else:
            authenticated = False

        if authenticated:
            self.factory.clients.append(self)
            self.factory.references[mac] = weakref.ref(self)
            return {'results':'Authenticated as %s'%username,'error':None}
        else:
            self.transport.loseConnection()

    def jsonrpc_sync_acq(self,data,f):
        """Save into django table data acquired from sensors and send ack to gateway"""
        if not (self in self.factory.clients):
            self.transport.loseConnection()
        print f
        return {'results':'synced %s records'%len(data),'error':'null'}

    def connectionLost(self, reason):
        """ mac address is searched and all reference to self.factory.clientes are erased """  
        for mac in self.factory.references.keys():
            if self.factory.references[mac]() == self:
                print 'Connection closed - Mac address: %s'%mac
                del self.factory.references[mac]
                self.factory.clients.remove(self)


class rpcfactory(jsonrpc.RPCFactory):
    protocol = arduinoRPC
    def __init__(self, maxLength=1024):
        self.maxLength = maxLength
        self.subHandlers = {}
        self.clients    =   []
        self.references =   {}

""" Asynchronous remote calling to micros, simulating random calling from server """
import threading,time,random,netstring,json
class asyncGatewayCalls(threading.Thread):
    def __init__(self,rpcfactory):
        threading.Thread.__init__(self)
        self.rpcfactory =   rpcfactory
        """identifiers of each micro/client connected"""
        self.remoteMacList    =   ['12:23:23:23:23:23:23','167:67:67:67:67:67:67','90:90:90:90:90:90:90']
    def run(self):
        while True:
            time.sleep(10)
            while True:
                """ call to any of three potential micros connected """ 
                mac = self.remoteMacList[random.randrange(0,len(self.remoteMacList))]
                if self.rpcfactory.references.has_key(mac):
                    print 'Calling %s'%mac
                    proto   =   self.rpcfactory.references[mac]()
                    """ requesting echo from selected micro"""
                    dataToSend  = netstring.encode(json.dumps({'method':'echo_from_micro','params':['plop']}))
                    proto.transport.write(dataToSend)
                    break

factory = rpcfactory(arduinoRPC)

"""start thread caller""" 
r=asyncGatewayCalls(factory)
r.start()

reactor.listenTCP(7080, factory)
print "Micros remote RPC server started"
reactor.run()
Это было полезно?

Решение

Вам нужно добавить достаточно информации в каждое сообщение, чтобы получатель мог определить, как его интерпретировать. Ваши требования очень похоже на требования Усилитель, чтобы вместо этого вы могли бы использовать AMP, либо использовать ту же структуру, что и усилитель для идентификации ваших сообщений. Конкретно:

  • В запросах поместите конкретный ключ - например, AMP использует «_ask» для идентификации запросов. Это также дает им уникальное значение, которое дополнительно идентифицирует этот запрос на срок службы подключения.
  • В ответах поместите другой ключ - например, AMP использует «_answer» для этого. Значение соответствует значению из ключа «_ask» в запросе, для которого предназначен ответ.

Используя такой подход, вам просто нужно посмотреть, есть ли ключ «_ask» или ключ «_answer», чтобы определить, получили ли вы новый запрос или ответ на предыдущий запрос.

По отдельной теме, ваш asyncGatewayCalls Класс не должен быть на основе потока. Нет никакой очевидной причины для его использования потоков, и тем самым это также неправильно использует скрученные API, что приведет к неопределенному поведению. Большинство скрученных API могут использоваться только в потоке, в которой вы звонили reactor.run. Анкет Единственное исключение - это reactor.callFromThread, который вы можете использовать для отправки сообщения в потоку реактора из любого другого потока. asyncGatewayCalls Однако пытается написать на транспорт, что приведет к коррупции буфера или произвольному задержению в отправленных данных, или, возможно, худшим вещам. Вместо этого вы можете написать asyncGatewayCalls как это:

from twisted.internet.task import LoopingCall

class asyncGatewayCalls(object):
    def __init__(self, rpcfactory):
        self.rpcfactory = rpcfactory
        self.remoteMacList = [...]

    def run():
        self._call = LoopingCall(self._pokeMicro)
        return self._call.start(10)

    def _pokeMicro(self):
        while True:
            mac = self.remoteMacList[...]
            if mac in self.rpcfactory.references:
                proto = ...
                dataToSend = ...
                proto.transport.write(dataToSend)
                break

factory = ...
r = asyncGatewayCalls(factory)
r.run()

reactor.listenTCP(7080, factory)
reactor.run()

Это дает вам однопоточное решение, которое должно иметь то же поведение, что и вы намеревались для оригинала asyncGatewayCalls учебный класс. Вместо того, чтобы спать в цикле в потоке, чтобы запланировать вызовы, он использует API планирования расписания реактора (с помощью класса LoopingCall более высокого уровня, который планирует называться многократно), чтобы убедиться _pokeMicro называется каждые десять секунд.

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top