Domanda

I have Flask application and implemented RPC in it.

engine.py

import pickle
import pika
from databasyfacade.rpc import api

__author__ = 'Marboni'

def on_request(ch, method, props, body):
    request = pickle.loads(body)
    func = getattr(api, request['func'])
    try:
        result = func(*request['args'])
    except Exception, e:
        response = {
            'status': 'ERROR',
            'error': e
        }
    else:
        response = {
            'status': 'OK',
            'result': result
        }

    ch.basic_publish(exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id
        ),
        body=pickle.dumps(response)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)


def init(host):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host))

    channel = connection.channel()
    channel.queue_declare(queue='facade_rpc')
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='facade_rpc')

api.py

from databasyfacade.services import profiles_service

__author__ = 'Marboni'

def profile(user_id):
    return profiles_service.profile(user_id)

When Flask application initializes, it runs method init(host).


Now I need to test how my application responses to RPC calls. So I wrote following RPC client:

client.py

import pickle
import uuid
import pika

__author__ = 'Marboni'

class RpcClient(object):
    def __init__(self, host):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)

        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)


    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body


    def call(self, func, *args):
        request = {
            'func': func,
            'args': args
        }

        self.response = None
        self.corr_id = str(uuid.uuid4())

        self.channel.basic_publish(exchange='',
            routing_key='facade_rpc',
            properties=pika.BasicProperties(
                reply_to = self.callback_queue,
                correlation_id = self.corr_id,
            ),
            body=pickle.dumps(request))

        while self.response is None:
            self.connection.process_data_events()
        response = pickle.loads(self.response)
        if response['status'] == 'ERROR':
            e = response['error']
            raise e
        else:
            return response['result']

Then I wrote test based on Flask-Testing framework. It initializes Flask application between each test method, so we can interact with it.

tests.py

from databasyfacade.rpc import RpcClient
from databasyfacade.testing import DatabasyTest, fixtures
from databasyfacade.testing.testdata import UserData, ProfileData

__author__ = 'Marboni'

class RpcTest(DatabasyTest):
    @fixtures(UserData, ProfileData)
    def test_profile(self, data):
        rpc = RpcClient(self.app.config['RABBITMQ_HOST'])
        profile = rpc.call('profile', ProfileData.hero.user_id)
        self.assertIsNotNone(profile)
        self.assertEqual(ProfileData.hero.email, profile.email)

This test hangs when makes call. It iterates infinitely here:

From client.py

while self.response is None:
    self.connection.process_data_events()

It means that on_response() method on client never called.

If I interrupt my tests with CTRL-C, I will see following stacktrace:

Traceback (most recent call last):
  File "../env/bin/nosetests", line 8, in <module>
    load_entry_point('nose==1.3.0', 'console_scripts', 'nosetests')()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/core.py", line 118, in __init__
    **extra_args)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/unittest/main.py", line 95, in __init__
    self.runTests()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/core.py", line 197, in runTests
    result = self.testRunner.run(self.test)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/core.py", line 61, in run
    test(result)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/unittest/suite.py", line 65, in __call__
    return self.run(*args, **kwds)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 74, in run
    test(result)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
    return self.run(*arg, **kw)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
    test(orig)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/case.py", line 45, in __call__
    return self.run(*arg, **kwarg)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/case.py", line 133, in run
    self.runTest(result)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/case.py", line 151, in runTest
    test(result)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/flask_testing.py", line 72, in __call__
    self._pre_setup()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/flask_testing.py", line 80, in _pre_setup
    self.app = self.create_app()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/testing/__init__.py", line 12, in create_app
    return app.create_app()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/app.py", line 75, in create_app
    init_rpc(app)
  File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/app.py", line 63, in init_rpc
    rpc.init(app.config['RABBITMQ_HOST'])
  File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/rpc/engine.py", line 39, in init
    channel.basic_consume(on_request, queue='facade_rpc')
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/channel.py", line 220, in basic_consume
    {'consumer_tag': consumer_tag})])
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1104, in _rpc
    self._wait_on_response(method_frame))
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1124, in _send_method
    self.connection.process_data_events()
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 215, in process_data_events
    if self._handle_read():
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 327, in _handle_read
    if self._read_poller.ready():
  File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 66, in ready
    self.poll_timeout)
KeyboardInterrupt

I tried to run application and access it from separate script:

#!/usr/bin/env python
from databasyfacade.rpc.client import RpcClient

rpc = RpcClient('localhost')

profile = rpc.call('profile', 4L)
print profile.email

As you can see, code is the same as in test, but in this case it works.

What can be the cause of this issue? May be, it's because Flask-Testing runs both application and client in one process? How to check it / write correct test?

È stato utile?

Soluzione

I found the cause, it's not related with MQ. Remove method accessed database using SQLAlchemy with scoped_session. Problem disappeared after I finalized session properly:

Session.remove()
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top