I found the cause, it's not related with MQ. Remove method accessed database using SQLAlchemy with scoped_session. Problem disappeared after I finalized session properly:
Session.remove()
Вопрос
I have Flask application and implemented RPC in it.
engine.py
import pickle
import pika
from databasyfacade.rpc import api
__author__ = 'Marboni'
def on_request(ch, method, props, body):
request = pickle.loads(body)
func = getattr(api, request['func'])
try:
result = func(*request['args'])
except Exception, e:
response = {
'status': 'ERROR',
'error': e
}
else:
response = {
'status': 'OK',
'result': result
}
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id=props.correlation_id
),
body=pickle.dumps(response)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
def init(host):
connection = pika.BlockingConnection(pika.ConnectionParameters(host))
channel = connection.channel()
channel.queue_declare(queue='facade_rpc')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='facade_rpc')
api.py
from databasyfacade.services import profiles_service
__author__ = 'Marboni'
def profile(user_id):
return profiles_service.profile(user_id)
When Flask application initializes, it runs method init(host)
.
Now I need to test how my application responses to RPC calls. So I wrote following RPC client:
client.py
import pickle
import uuid
import pika
__author__ = 'Marboni'
class RpcClient(object):
def __init__(self, host):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=host))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, func, *args):
request = {
'func': func,
'args': args
}
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='facade_rpc',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=pickle.dumps(request))
while self.response is None:
self.connection.process_data_events()
response = pickle.loads(self.response)
if response['status'] == 'ERROR':
e = response['error']
raise e
else:
return response['result']
Then I wrote test based on Flask-Testing framework. It initializes Flask application between each test method, so we can interact with it.
tests.py
from databasyfacade.rpc import RpcClient
from databasyfacade.testing import DatabasyTest, fixtures
from databasyfacade.testing.testdata import UserData, ProfileData
__author__ = 'Marboni'
class RpcTest(DatabasyTest):
@fixtures(UserData, ProfileData)
def test_profile(self, data):
rpc = RpcClient(self.app.config['RABBITMQ_HOST'])
profile = rpc.call('profile', ProfileData.hero.user_id)
self.assertIsNotNone(profile)
self.assertEqual(ProfileData.hero.email, profile.email)
This test hangs when makes call. It iterates infinitely here:
From client.py
while self.response is None:
self.connection.process_data_events()
It means that on_response()
method on client never called.
If I interrupt my tests with CTRL-C, I will see following stacktrace:
Traceback (most recent call last):
File "../env/bin/nosetests", line 8, in <module>
load_entry_point('nose==1.3.0', 'console_scripts', 'nosetests')()
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/core.py", line 118, in __init__
**extra_args)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/unittest/main.py", line 95, in __init__
self.runTests()
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/core.py", line 197, in runTests
result = self.testRunner.run(self.test)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/core.py", line 61, in run
test(result)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
return self.run(*arg, **kw)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
test(orig)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/unittest/suite.py", line 65, in __call__
return self.run(*args, **kwds)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 74, in run
test(result)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
return self.run(*arg, **kw)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
test(orig)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
return self.run(*arg, **kw)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
test(orig)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
return self.run(*arg, **kw)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
test(orig)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
return self.run(*arg, **kw)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
test(orig)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 176, in __call__
return self.run(*arg, **kw)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/suite.py", line 223, in run
test(orig)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/case.py", line 45, in __call__
return self.run(*arg, **kwarg)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/case.py", line 133, in run
self.runTest(result)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/nose/case.py", line 151, in runTest
test(result)
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/flask_testing.py", line 72, in __call__
self._pre_setup()
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/flask_testing.py", line 80, in _pre_setup
self.app = self.create_app()
File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/testing/__init__.py", line 12, in create_app
return app.create_app()
File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/app.py", line 75, in create_app
init_rpc(app)
File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/app.py", line 63, in init_rpc
rpc.init(app.config['RABBITMQ_HOST'])
File "/Users/Marboni/Projects/Databasy/databasy-facade/databasyfacade/databasyfacade/rpc/engine.py", line 39, in init
channel.basic_consume(on_request, queue='facade_rpc')
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/channel.py", line 220, in basic_consume
{'consumer_tag': consumer_tag})])
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1104, in _rpc
self._wait_on_response(method_frame))
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1124, in _send_method
self.connection.process_data_events()
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 215, in process_data_events
if self._handle_read():
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 327, in _handle_read
if self._read_poller.ready():
File "/Users/Marboni/Projects/Databasy/databasy-facade/env/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 66, in ready
self.poll_timeout)
KeyboardInterrupt
I tried to run application and access it from separate script:
#!/usr/bin/env python
from databasyfacade.rpc.client import RpcClient
rpc = RpcClient('localhost')
profile = rpc.call('profile', 4L)
print profile.email
As you can see, code is the same as in test, but in this case it works.
What can be the cause of this issue? May be, it's because Flask-Testing runs both application and client in one process? How to check it / write correct test?
Решение
I found the cause, it's not related with MQ. Remove method accessed database using SQLAlchemy with scoped_session. Problem disappeared after I finalized session properly:
Session.remove()