Twisted: perché è che il superamento di un callback differita ad una discussione differita rende il filo bloccare tutto ad un tratto?

StackOverflow https://stackoverflow.com/questions/2466000

Domanda

Ho provato ad utilizzare, senza successo, txredis (l'API ritorto non bloccante per Redis) per una coda di messaggi persistente Sto cercando di creare con un progetto Scrapy sto lavorando. Ho trovato che, anche se il cliente non è stato il blocco, è diventato molto più lento di quello che avrebbe potuto essere, perché quello che doveva essere un evento nel ciclo reattore è stato suddiviso in migliaia di passi.

Così, invece, ho provato a fare uso di Redis-py (il regolare blocco api ritorto) e avvolgendo la chiamata in un thread differito. E le grandi opere, ma voglio eseguire un interno differita quando faccio una chiamata a Redis come vorrei per impostare il pool di connessioni nel tentativo di accelerare le cose ulteriormente.

Di seguito è la mia interpretazione di alcuni esempi di codice preso dalla documentazione intrecciati per un thread differita per illustrare il mio caso d'uso:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

Ed ecco la mia alterazione per il pool di connessioni che rende il codice del blocco filo differita:

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

Quindi la mia domanda è, qualcuno sa il motivo per cui la mia alterazione provoca il filo differita da bloccando e / o qualcuno può suggerire una soluzione migliore?

È stato utile?

Soluzione

Bene, come il docs intrecciati dicono:

  

Deferreds non fanno il codice   magicamente non bloccare

Ogni volta che si utilizza il codice di blocco, come ad esempio sleep, si deve rinviare ad un nuovo thread.

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

Nel caso in cui l'API Redis non è molto complesso potrebbe essere più naturale di riscrivere usando twisted.web, invece di chiamare l'API di blocco in un sacco di discussioni.

Altri suggerimenti

C'è anche un client di up-to-date Redis per contorto, che già supporta il nuovo protocollo e le caratteristiche di Redis 2.x. Si dovrebbe sicuramente fare un tentativo. Si chiama txredisapi.

Per la coda di messaggi persistente, mi consiglia RestMQ. Un sistema di coda di messaggi Redis-based costruita sulla parte superiore del ciclone e txredisapi.

http://github.com/gleicon/restmq

Saluti

In una nota correlata, probabilmente si potrebbe guadagnare molto utilizzando un client Redis creato appositamente per Twisted, come questa: http://github.com/deldotdr/txRedis

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top