Скрученный:почему передача отложенного обратного вызова отложенному потоку приводит к внезапной блокировке потока?
-
20-09-2019 - |
Вопрос
Я безуспешно пытался использовать txredis (неблокирующий twisted api для redis) для постоянной очереди сообщений, которую я пытаюсь настроить с помощью проекта scrapy, над которым я работаю.Я обнаружил, что, хотя клиент не блокировался, он стал намного медленнее, чем мог бы быть, потому что то, что должно было быть одним событием в цикле реактора, было разделено на тысячи шагов.
Поэтому вместо этого я попытался использовать redis-py (обычный блокирующий twisted api) и обернуть вызов в отложенный поток.Это отлично работает, однако я хочу выполнить внутреннюю отсрочку при вызове redis, поскольку я хотел бы настроить пул соединений в попытках еще больше ускорить процесс.
Ниже приведена моя интерпретация некоторого примера кода, взятого из twisted docs для отложенного потока, чтобы проиллюстрировать мой вариант использования:
#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall():
print 'doing lookup... this may take a while'
time.sleep(10)
return 'results from redis'
def result(res):
print res
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = threads.deferToThread(aBlockingRedisCall)
d.addCallback(result)
reactor.run()
if __name__=='__main__':
main()
И вот мое изменение для пула соединений, которое блокирует код в отложенном потоке :
#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
time.sleep(10) # this is now blocking.. any ideas?
d = defer.Deferred()
d.addCallback(gotFinalResult)
d.callback(x)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
Итак, мой вопрос в том, знает ли кто-нибудь, почему мое изменение приводит к блокировке отложенного потока и / или кто-нибудь может предложить лучшее решение?
Решение
Ну, а поскольку скрученные документы сказать:
Отсрочки не приводят к созданию кода волшебным образом не блокируют
Всякий раз, когда вы используете блокирующий код, например sleep
, вы должны отложить это до нового потока.
#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
def getstuff( x ):
time.sleep(3)
return "stuff is %s" % x
# getstuff is blocking, so you need to push it to a new thread
d = threads.deferToThread(getstuff, x)
d.addCallback(gotFinalResult)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
В случае, если redis api не очень сложный, было бы естественнее переписать его с помощью twisted.web, вместо того, чтобы просто вызывать блокирующий API в большом количестве потоков.
Другие советы
Также имеется обновленный клиент Redis для Twisted, который уже поддерживает новый протокол и функции Redis 2.x.Вам обязательно стоит попробовать.Это называется txredisapi.
Для постоянной очереди сообщений я бы рекомендовал RestMQ.Система очередей сообщений на основе Redis, построенная на основе Cyclone и txredisapi.
http://github.com/gleicon/restmq
Ваше здоровье
Кстати, вы, вероятно, могли бы многого добиться, используя клиент Redis, созданный специально для Twisted, например этот: http://github.com/deldotdr/txRedis