Twisted: pourquoi est-ce que le passage d'un rappel différé à un fil différé rend le fil bloquant tout d'un coup?
-
20-09-2019 - |
Question
Je vain essayé d'utiliser txredis (non bloquant api torsadée pour Redis) pour une file d'attente d'un message persistant que je suis en train de mettre en place un projet de scrapy je travaille. Je trouve que même si le client n'a pas le blocage, il est devenu beaucoup plus lent que cela aurait pu être parce que ce qui aurait dû être un événement dans la boucle du réacteur a été divisé en milliers d'étapes.
Ainsi, au lieu, j'essayé de faire usage de Redis-py (l'api tordu le blocage régulier) et enveloppant l'appel dans un fil différé. Il fonctionne très bien, mais je veux effectuer une différé intérieure quand je fais un appel à Redis que je voudrais mettre en place la mise en commun de connexion pour tenter d'accélérer les choses plus loin.
Voici mon interprétation de quelques exemples de code pris des docs tordues pour un fil différé pour illustrer mon cas d'utilisation:
#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall():
print 'doing lookup... this may take a while'
time.sleep(10)
return 'results from redis'
def result(res):
print res
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = threads.deferToThread(aBlockingRedisCall)
d.addCallback(result)
reactor.run()
if __name__=='__main__':
main()
Et voici mon changement pour la mise en commun de connexion qui rend le code dans le blocage de fil différé:
#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
time.sleep(10) # this is now blocking.. any ideas?
d = defer.Deferred()
d.addCallback(gotFinalResult)
d.callback(x)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
Alors ma question est, est-ce que quelqu'un sait pourquoi mon changement provoque le fil différé à bloquer et / ou peut-on proposer une meilleure solution?
La solution
Eh bien, comme docs tordus dire:
Deferreds ne font pas le code magie ne bloque pas
Chaque fois que vous utilisez le code de blocage, tels que sleep
, vous devez le remettre à un nouveau thread.
#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
def getstuff( x ):
time.sleep(3)
return "stuff is %s" % x
# getstuff is blocking, so you need to push it to a new thread
d = threads.deferToThread(getstuff, x)
d.addCallback(gotFinalResult)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
Dans le cas où l'api Redis est pas très complexe, il pourrait être plus naturel de le réécrire en utilisant twisted.web, au lieu d'appeler l'api de blocage dans un des fils du lot.
Autres conseils
Il y a aussi une mise à jour client Redis pour torsadés qui prend déjà en charge le nouveau protocole et les caractéristiques des Redis 2.x. Vous devriez definetely essayer. Il est appelé txredisapi.
Pour la file d'attente de messages persistants, je vous recommande RestMQ. Système de file d'attente message basé Redis construit au-dessus du cyclone et txredisapi.
http://github.com/gleicon/restmq
Vive
Sur le même sujet, vous pourriez probablement gagner beaucoup en utilisant un client Redis créé spécifiquement pour Twisted, comme celui-ci: http://github.com/deldotdr/txRedis