Twisted: Por que passar um retorno de chamada diferido a um fio diferido faz com que o fio bloqueie de repente?

StackOverflow https://stackoverflow.com/questions/2466000

Pergunta

Eu tentei, sem sucesso, usando o TXredis (a API torcida sem bloqueio para Redis) para uma fila de mensagens persistentes que estou tentando configurar com um projeto de scrapy em que estou trabalhando. Descobri que, embora o cliente não estivesse bloqueando, ficou muito mais lento do que poderia ter sido, porque o que deveria ter sido um evento no loop do reator foi dividido em milhares de etapas.

Então, em vez disso, tentei usar o Redis-Py (a API torcida de bloqueio regular) e envolver a chamada em um thread diferido. Funciona muito bem, no entanto, eu quero realizar um diferido interno quando faço uma chamada para Redis, pois gostaria de configurar o pool de conexões na tentativa de acelerar as coisas ainda mais.

Abaixo está minha interpretação de algum código de amostra retirado dos documentos distorcidos para um tópico diferido para ilustrar meu caso de uso:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

E aqui está minha alteração para o pool de conexões que faz o código no bloqueio do encadeamento diferido:

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

Então, minha pergunta é: alguém sabe por que minha alteração faz com que o fio diferido esteja bloqueando e/ou alguém pode sugerir uma solução melhor?

Foi útil?

Solução

Bem, como o documentos torcidos dizer:

Os diferidos não fazem o código não bloquear magicamente

Sempre que você está usando o código de bloqueio, como sleep, você precisa adiá -lo para um novo tópico.

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

Caso a API Redis não seja muito complexa, pode ser mais natural reescrevê -la usando Twisted.Web, em vez de apenas chamar a API de bloqueio em muitos threads.

Outras dicas

Há também um cliente Redis atualizado para Twisted, que já suporta o novo protocolo e os recursos do Redis 2.x. Você definitivamente deve tentar. Chama -se txredisapi.

Para a fila de mensagens persistente, eu recomendo RestMQ. Um sistema de fila de mensagens baseado em Redis, construído sobre o Cyclone e o TxredIsapi.

http://github.com/gleicon/restmq

Felicidades

Em uma nota relacionada, você provavelmente poderia ganhar muito usando um cliente Redis criado especificamente para Twisted, como este: http://github.com/deldotdr/txredis

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top