Retorcido:¿Por qué pasar una devolución de llamada diferida a un hilo diferido hace que el hilo se bloquee de repente?

StackOverflow https://stackoverflow.com/questions/2466000

Pregunta

Intenté sin éxito usar txredis (la API retorcida sin bloqueo para redis) para una cola de mensajes persistentes que estoy intentando configurar con un proyecto scrapy en el que estoy trabajando.Descubrí que, aunque el cliente no se estaba bloqueando, se volvió mucho más lento de lo que podría haber sido porque lo que debería haber sido un evento en el circuito del reactor se dividió en miles de pasos.

Entonces, en lugar de eso, intenté usar redis-py (la API retorcida de bloqueo normal) y envolver la llamada en un hilo diferido.Funciona muy bien, sin embargo, quiero realizar un aplazamiento interno cuando hago una llamada a Redis, ya que me gustaría configurar la agrupación de conexiones en un intento de acelerar aún más las cosas.

A continuación se muestra mi interpretación de un código de muestra tomado de los documentos retorcidos para un hilo diferido para ilustrar mi caso de uso:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

Y aquí está mi modificación para la agrupación de conexiones que hace que el código en el hilo diferido se bloquee:

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

Entonces mi pregunta es, ¿alguien sabe por qué mi alteración hace que el hilo aplazado se bloquee y/o alguien puede sugerir una solución mejor?

¿Fue útil?

Solución

Bueno, como documentos torcidos del dicen:

  

Deferreds no hacen el código   mágicamente no bloquear

Siempre que estés utilizando el código de bloqueo, como sleep, usted tiene que aplazar a un nuevo hilo.

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

En caso de que el API Redis no es muy compleja que podría ser más natural a volver a escribir usando twisted.web, en lugar de llamar a la API de bloqueo en muchos temas.

Otros consejos

También hay un cliente Redis actualizado para Twisted que ya admite el nuevo protocolo y las características de Redis 2.x.Definitivamente deberías intentarlo.Se llama txredisapi.

Para la cola de mensajes persistentes, recomendaría RestMQ.Un sistema de cola de mensajes basado en Redis construido sobre cyclone y txredisapi.

http://github.com/gleicon/restmq

Salud

En una nota relacionada, que probablemente podría ganar mucho mediante el uso de un cliente Redis creada específicamente para Twisted, como este: http://github.com/deldotdr/txRedis

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top