变成了:那为什么传递延迟回调到递延线程使线程阻塞突然?
-
20-09-2019 - |
题
我使用txredis(对于redis的非阻挡扭曲API)的一个持久消息队列我试图建立与我的工作一个scrapy项目不成功尝试。我发现,虽然客户没有隔断,成为远远低于它可能是因为什么应该是在反应器循环一个事件被分成数千个步骤。
所以不是,我试图利用redis的-PY的(经常阻塞扭曲API)和包裹在一个延迟线程的调用。它的伟大工程,但我想,当我拨打电话到Redis的,因为我想建立连接池的尝试,以进一步加快速度进行内部延迟。
下面是我的从绞合文档采取延期螺纹一些示例代码来说明我的用例的解释:
#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall():
print 'doing lookup... this may take a while'
time.sleep(10)
return 'results from redis'
def result(res):
print res
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = threads.deferToThread(aBlockingRedisCall)
d.addCallback(result)
reactor.run()
if __name__=='__main__':
main()
这是我的连接池改造,使递延线程阻塞的代码:
#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
time.sleep(10) # this is now blocking.. any ideas?
d = defer.Deferred()
d.addCallback(gotFinalResult)
d.callback(x)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
所以我的问题是,没有人知道为什么我的改变导致的递延线程会阻止和/或任何人都可以提出一个更好的解决方案吗?
解决方案
好视扭曲文档说:
Deferreds不使代码 神奇不会阻止
每当你使用的是阻塞的代码,如sleep
,你必须把它推迟到一个新的线程。
#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
def getstuff( x ):
time.sleep(3)
return "stuff is %s" % x
# getstuff is blocking, so you need to push it to a new thread
d = threads.deferToThread(getstuff, x)
d.addCallback(gotFinalResult)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
在的情况下redis的API不是很复杂,可能更自然使用twisted.web重写它,而不是仅仅调用阻挡API中很多的螺纹。
其他提示
还有用于扭曲向上最新Redis的客户端,它已经支持Redis的2.x的新协议和功能你应该definetely试一试。这就是所谓的txredisapi。
有关的持久消息队列,我建议RestMQ。建立在旋风分离器和txredisapi的顶部基于redis的消息队列系统。
http://github.com/gleicon/restmq
干杯
在一个相关的说明,你很可能通过使用专门为扭转创建Redis的客户端,比如这个收获了很多:的 http://github.com/deldotdr/txRedis
不隶属于 StackOverflow