Verdreht: warum ist es, dass Leiten eines latenten Rückruf zu einer latenten Faden macht den Faden ganz plötzlich blockiert?

StackOverflow https://stackoverflow.com/questions/2466000

Frage

ich mit txredis erfolglos versucht (die nicht verdreht api für redis Blockierung) für eine anhaltende Nachrichtenwarteschlange Ich versuche, mit einem scrapy Projekt zur Einrichtung arbeite ich an. Ich fand, dass, obwohl der Client nicht blockiert wurde, wurde es langsamer viel, als es da gewesen sein könnte, was ein Ereignis in der Reaktorschleife sein sollte bis in die Tausende von Schritten aufgeteilt wurde.

So stattdessen habe ich versucht, den Einsatz von redis-py machen (die regelmäßige Sperrung verdrehte api) und den Anruf in einem latenten Faden gewickelt wird. Es funktioniert großartig, aber ich will latenten eine innere auszuführen, wenn ich einen Anruf zu redis machen, wie ich oben weiter versucht, Geschwindigkeit Dinge Verbindungspooling einrichten möchten.

Im Folgenden finden Sie meine Interpretation einiger Codebeispiel aus den verdrehten docs für einen latenten Faden genommen meine Anwendungsfall veranschaulichen:

#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'


def aBlockingRedisCall():
    print 'doing lookup... this may take a while'
    time.sleep(10)
    return 'results from redis'

def result(res):
    print res

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)
    d = threads.deferToThread(aBlockingRedisCall)
    d.addCallback(result)
    reactor.run()

if __name__=='__main__':
    main()

Und hier ist meine Veränderung für Connection Pooling, die den Code in dem latenten Fadensperr macht:

#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        time.sleep(10) # this is now blocking.. any ideas?
        d = defer.Deferred()
        d.addCallback(gotFinalResult)
        d.callback(x)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

Also meine Frage ist, hat jemand wissen, warum meine Veränderung der latenten Faden bewirkt, daß Blockieren und / oder kann jemand empfehlen, eine bessere Lösung?

War es hilfreich?

Lösung

Nun, da die verdrehten docs sagen:

  

Deferreds machen Sie nicht den Code   magisch nicht blockieren

Wenn Sie verwenden Code blockieren, wie sleep, haben Sie es zu einem neuen Thread zu verschieben.

#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time

def main_loop():
    print 'doing stuff in main loop.. do not block me!'

def aBlockingRedisCall(x):
    if x<5: #all connections are busy, try later
        print '%s is less than 5, get a redis client later' % x
        x+=1
        d = defer.Deferred()
        d.addCallback(aBlockingRedisCall)
        reactor.callLater(1.0,d.callback,x)
        return d

    else: 
        print 'got a redis client; doing lookup.. this may take a while'
        def getstuff( x ):
            time.sleep(3)
            return "stuff is %s" % x

        # getstuff is blocking, so you need to push it to a new thread
        d = threads.deferToThread(getstuff, x)
        d.addCallback(gotFinalResult)
        return d

def gotFinalResult(x):
    return 'final result is %s' % x

def result(res):
    print res

def aBlockingMethod():
    print 'going to sleep...'
    time.sleep(10)
    print 'woke up'

def main():
    lc = LoopingCall(main_loop)
    lc.start(2)


    d = defer.Deferred()
    d.addCallback(aBlockingRedisCall)
    d.addCallback(result)
    reactor.callInThread(d.callback, 1)
    reactor.run()

if __name__=='__main__':
    main()

Bei der redis api nicht sehr komplex ist, könnte es natürlich sein, es neu zu schreiben twisted.web verwenden, anstatt nur die Blockierung api in vielen Threads aufgerufen wird.

Andere Tipps

Es gibt auch ein up-to-date Redis-Client für verdrillte, die bereits das neue Protokoll unterstützt und verfügen über von Redis 2.x. Sie sollten es auf jeden Fall versuchen. Es heißt txredisapi.

Für die persistente Nachrichten-Warteschlange, würde ich RestMQ empfehlen. Ein redis basierte Nachrichtenwarteschlangensystem auf der Oberseite des Zyklons und txredisapi gebaut.

http://github.com/gleicon/restmq

Prost

über einen entsprechenden Hinweis, könnten Sie wahrscheinlich viel gewinnen durch einen Redis-Client speziell für Twisted erstellt, wie diese: http://github.com/deldotdr/txRedis

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top