我创建了一个简单的 RPC 服务器执行我们团队共有的某些任务,但它们是从不同网络调用的。服务器看起来像这样(我不包括为简短处理错误处理):

from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor
import json

class MyProtocol(Protocol):
    def dataReceived(self, data):
        req = json.loads(data) # create a dictionary from JSON string
        method = getattr(self, req['method']) # get the method
        method(req['params']) # call the method

    def add(self, params):
        result = {} # initialize a dictionary to convert later to JSON
        result['result'] = sum(params) 
        result['error'] = None 
        result['id'] = 1
        self.transport.write(json.dumps(result)) # return a JSON string
        self.transport.loseConnection() # close connection

factory = Factory()
factory.protocol = MyProtocol
reactor.listenTCP(8080, factory)
reactor.run()

这很简单:服务器从客户端接收JSON RPC请求,寻找方法,并调用传递参数的方法。该方法本身是返回JSON RPC响应的方法。对于不太熟悉的 JSON RPC 看起来像这样:

request:
{"method":"my_method", "params":[1,2,3], "id":"my_id"}
response:
{"result":"my_result", "error":null, "id":"my_id"}

我拥有的RPC服务器非常适合我当前的目的(您可以想象,我的任务非常简单)。但是,随着任务的复杂性的增加,我将需要继续添加方法。

我不想打开主文件并添加另一个 def method3(...) 而且,两个星期后,请添加 def method4(...) 等等;该代码将太快增长,维护将越来越困难。

所以,我的问题是: 我如何创建一个允许我的体系结构 登记 服务器的方法. 。一个奖励是让一个单独的文件夹按每个方法持有一个文件,以便可以轻松地共享和维护它们。这种“体系结构”还可以使我能够将某些方法的维护推迟给其他人,无论他们对扭曲的理解如何。

我不在乎我是否需要每次注册新方法时都需要重新启动服务器,但是显而易见的是如果我 也有 :)。

谢谢。

有帮助吗?

解决方案

有点大的顺序;)但是,这里有一些初始步骤(在示例中非常嘲笑,扭曲的细节):

# your twisted imports...
import json

class MyProtocol(object): # Would be Protocol instead of object in real code

    def dataReceived(self, data):
        req = json.loads(data) # create a dictionary from JSON string
        modname, funcname = req['method'].split('.')
        m = __import__(modname)
        method = getattr(m, funcname) # get the method
        method(self, req['params']) # call the method

假设您像我们执行此操作一样尝试一下:

mp = MyProtocol()
mp.dataReceived('{"method":"somemod.add", "params":[1,2,3]}')

你有一个模块 somemod.py 在与以下内容的示例相同的目录中(镜像示例方法 .add() 多于):

import json

def add(proto, params):
    result = {} # initialize a dictionary to convert later to JSON
    result['result'] = sum(params)
    result['error'] = None
    result['id'] = 1
    proto.transport.write(json.dumps(result)) # return a JSON string
    proto.transport.loseConnection() # close connection

这使您可以使用每个方法拥有一个模块。这 method(.. 上面的电话将永远通过 MyProtocol 实例到服务可召唤。 (如果您真的想要实例方法,则此下是有关如何使用Python添加方法的说明: http://irrepupavel.com/documents/python/instancemethod/ )

您将需要添加很多错误处理。例如,您需要在 split() 在第2行上致电 dataReceived().

有了这个,您可以将其中一个函数的单独文件用于您需要支持的每种方法。绝不是一个完整的例子,但它可能会让您前进,因为您的寻找的是非常复杂的。

对于更正式的注册,我建议 dictMyProtocol 带有您支持的方法的名称,沿着:

# in MyProtocol's __init__() method:
self.methods = {}

和寄存器方法。

def register(self, name, callable):
    self.methods[name] = callable

..调整 dataReceived()..

def dataReceived(self, data):
    # ...
    modname, funcname = self.methods.get(req['method'], False)
    # ..continue along the lines of the dataReceived() method above

快速摘要太长了: __import__ 功能 ( http://docs.python.org/library/functions.html )肯定是您解决方案的关键部分。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top