Question

Im trying to use alchimia for get asynchronous API for DB. Trying to make a simple request to DB, like that:

    def authorization(self, data):
        """
            Checking user with DB
        """

        def __gotResult(user):
            yield engine.execute(sqlalchemy.select([Users]).where(Users.name == user))

        result = __gotResult(data['user'])
        log.msg("[AUTH] User=%s trying to auth..." % data['user'])
        data, result_msg = commands.AUTH(result, data)
        log.msg(result_msg)
        return data

And cant understand - what i doing wrong? Maybe issue in option for engine (where reactor=[])?

Source code:

import sys
from json import dumps, loads

import sqlalchemy
from twisted.internet import reactor, ssl
from twisted.python import log, logfile
from twisted.web.server import Site
from twisted.web.static import File
from autobahn.twisted.websocket import WebSocketServerFactory, WebSocketServerProtocol, listenWS

import commands
from db.tables import Users
from alchimia import TWISTED_STRATEGY


log_file = logfile.LogFile("service.log", ".")
log.startLogging(log_file)
engine = sqlalchemy.create_engine('postgresql://test:test@localhost/testdb', pool_size=20, max_overflow=0,strategy=TWISTED_STRATEGY, reactor=[])


class DFSServerProtocol(WebSocketServerProtocol):

    commands = commands.commands_user

    def __init__(self):
        self.commands_handlers = self.__initHandlersUser()

    def __initHandlersUser(self):
        handlers = commands.commands_handlers_server
        handlers['AUTH'] = self.authorization
        handlers['READ'] = None
        handlers['WRTE'] = None
        handlers['DELT'] = None
        handlers['RNME'] = None
        handlers['SYNC'] = None
        handlers['LIST'] = None
        return handlers

    def authorization(self, data):
        """
            Checking user with DB
        """

        def __gotResult(user):
            yield engine.execute(sqlalchemy.select([Users]).where(Users.name == data['user']))

        result = __gotResult(data['user'])
        log.msg("[AUTH] User=%s trying to auth..." % data['user'])
        data, result_msg = commands.AUTH(result, data)
        log.msg(result_msg)
        return data

    def onMessage(self, payload, isBinary):

        json_data = loads(payload)
        json_auth = json_data['auth']
        json_cmd  = json_data['cmd']

        if json_auth == False:
            if json_cmd == 'AUTH':
                json_data = self.commands_handlers['AUTH'](json_data)
        # for authorized users
        else:
            if json_cmd in commands.commands_user.keys():
                if self.commands_handlers[json_cmd] is not None:
                    json_data = self.commands_handlers[json_cmd](json_data)
                else:
                    json_data['error'] = '%s command is not already realized...' % json_cmd
            else:
                json_data['auth'] = False
                json_data['error'] = 'This command is not supported on server...'
        response = dumps(json_data)
        self.sendMessage(str(response))

if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1] == 'debug':
        log.startLogging(sys.stdout)
        debug = True
    else:
        debug = False

    contextFactory = ssl.DefaultOpenSSLContextFactory('keys/server.key', 'keys/server.crt')

    factory = WebSocketServerFactory("wss://localhost:9000", debug = debug, debugCodePaths = debug)
    factory.protocol = DFSServerProtocol
    factory.setProtocolOptions(allowHixie76 = True)

    listenWS(factory, contextFactory)

    webdir = File("./web/")
    webdir.contentTypes['.crt'] = 'application/x-x509-ca-cert'
    web = Site(webdir)

    reactor.listenSSL(8080, web, contextFactory)
    #reactor.listenTCP(8080, web)

    reactor.run()

Traceback:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/twisted/python/log.py", line 88, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/usr/local/lib/python2.7/dist-packages/twisted/python/log.py", line 73, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/posixbase.py", line 614, in _doReadOrWrite
    why = selectable.doRead()
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/tcp.py", line 215, in doRead
    return self._dataReceived(data)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/tcp.py", line 221, in _dataReceived
    rval = self.protocol.dataReceived(data)
  File "/usr/local/lib/python2.7/dist-packages/twisted/protocols/tls.py", line 419, in dataReceived
    self._flushReceiveBIO()
  File "/usr/local/lib/python2.7/dist-packages/twisted/protocols/tls.py", line 389, in _flushReceiveBIO
    ProtocolWrapper.dataReceived(self, bytes)
  File "/usr/local/lib/python2.7/dist-packages/twisted/protocols/policies.py", line 120, in dataReceived
    self.wrappedProtocol.dataReceived(data)
  File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/websocket.py", line 78, in dataReceived
    self._dataReceived(data)
  File "/usr/local/lib/python2.7/dist-packages/autobahn/websocket/protocol.py", line 1270, in _dataReceived
    self.consumeData()
  File "/usr/local/lib/python2.7/dist-packages/autobahn/websocket/protocol.py", line 1286, in consumeData
    while self.processData() and self.state != WebSocketProtocol.STATE_CLOSED:
  File "/usr/local/lib/python2.7/dist-packages/autobahn/websocket/protocol.py", line 1445, in processData
    return self.processDataHybi()
  File "/usr/local/lib/python2.7/dist-packages/autobahn/websocket/protocol.py", line 1758, in processDataHybi
    fr = self.onFrameEnd()
  File "/usr/local/lib/python2.7/dist-packages/autobahn/websocket/protocol.py", line 1887, in onFrameEnd
    self._onMessageEnd()
  File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/websocket.py", line 107, in _onMessageEnd
    self.onMessageEnd()
  File "/usr/local/lib/python2.7/dist-packages/autobahn/websocket/protocol.py", line 734, in onMessageEnd
    self._onMessage(payload, self.message_is_binary)
  File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/websocket.py", line 110, in _onMessage
    self.onMessage(payload, isBinary)
  File "server.py", line 84, in onMessage
    json_data = self.commands_handlers['AUTH'](json_data)
  File "server.py", line 68, in authorization
    data, result_msg = commands.AUTH(result, data)
  File "/home/relrin/code/Helenae/helenae/commands.py", line 68, in AUTH
    if result['name'] == data['user']:
exceptions.TypeError: 'generator' object has no attribute '__getitem__'
Was it helpful?

Solution

I think you are missing an @inlineCallbacks around __gotResult() That might not help you quite enough, though; since a single statement generator wrapped with inlineCallbacks is sort of pointless. You should get used to working with explicit deferred handling anyway. Lets pull this apart:

def authorization(self, data):
    """
        Checking user with DB
    """
    # engine.execute already gives us a deferred, will grab on to that.
    user = data['user']
    result_d = engine.execute(sqlalchemy.select([Users]).where(Users.name == user))

    # we don't have the result in authorization, 
    # we need to wrap any code that works with its result int a callback.
    def result_cb(result):
        data, result_msg = commands.AUTH(result, data)
        return data

    result_d.addCallback(result_cb)

    # we want to pass the (asynchronous) result out, it's hiding in our deferred;
    # so we return *that* instead; callers need to add more callbacks to it.
    return result_d

If you insist; we can squish this down into an inline callbacks form:

from twisted.internet.defer import inlineCallbacks, returnValue

@inlineCallbacks
def authorization(self, data):
    user = data['user']
    result = yield engine.execute(sqlalchemy.select([Users]).where(Users.name == user))
    data, result_msg = commands.AUTH(result, data)
    yield returnValue(data)

as before, though, authorization() is asynchronous; and must be since engine.execute is async. to use it, you must attach a callback to the deferred it returns (although the caller may also yield it if it is also wrapped in inlineCallbacks

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top