Twisted : 연기 된 콜백을 연기 된 스레드에 전달하는 이유는 무엇입니까?
-
20-09-2019 - |
문제
나는 지속적인 메시지 대기열을 위해 TXREDIS (Redis의 비 블로킹 트위스트 API)를 사용하여 실패했습니다. 나는 클라이언트가 차단되지 않았지만 원자로 루프에서 한 이벤트가되어야했던 것이 수천 단계로 나뉘어 졌기 때문에 그랬던 것보다 훨씬 느려 졌다는 것을 알았습니다.
대신, 나는 Redis-Py (일반 차단 Twisted API)를 사용하고 연기 된 스레드에 전화를 감싸려 고 시도했습니다. 그것은 훌륭하게 작동하지만, 나는 더 빠른 속도를 높이려고 시도 할 때 연결 풀링을 설정하고 싶을 때 Redis를 호출 할 때 내부 연기를 수행하고 싶습니다.
아래는 내 유스 케이스를 설명하기 위해 연기 된 스레드에 대한 비틀림 문서에서 가져온 일부 샘플 코드에 대한 나의 해석입니다.
#!/usr/bin/env python
from twisted.internet import reactor,threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall():
print 'doing lookup... this may take a while'
time.sleep(10)
return 'results from redis'
def result(res):
print res
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = threads.deferToThread(aBlockingRedisCall)
d.addCallback(result)
reactor.run()
if __name__=='__main__':
main()
그리고 여기에 연기 된 스레드 차단에서 코드를 만드는 연결 풀링에 대한 나의 변경이 있습니다.
#!/usr/bin/env python
from twisted.internet import reactor,defer
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
time.sleep(10) # this is now blocking.. any ideas?
d = defer.Deferred()
d.addCallback(gotFinalResult)
d.callback(x)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
내 질문은 내 변경이 왜 이연 스레드가 차단되고/또는 누구든지 더 나은 솔루션을 제안 할 수있는 이유를 아는 사람이 있습니까?
해결책
글쎄, 뒤틀린 문서 말하다:
연기는 코드를 마술처럼 차단하지 않습니다
다음과 같은 차단 코드를 사용할 때마다 sleep
, 당신은 그것을 새 스레드로 연기해야합니다.
#!/usr/bin/env python
from twisted.internet import reactor,defer, threads
from twisted.internet.task import LoopingCall
import time
def main_loop():
print 'doing stuff in main loop.. do not block me!'
def aBlockingRedisCall(x):
if x<5: #all connections are busy, try later
print '%s is less than 5, get a redis client later' % x
x+=1
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
reactor.callLater(1.0,d.callback,x)
return d
else:
print 'got a redis client; doing lookup.. this may take a while'
def getstuff( x ):
time.sleep(3)
return "stuff is %s" % x
# getstuff is blocking, so you need to push it to a new thread
d = threads.deferToThread(getstuff, x)
d.addCallback(gotFinalResult)
return d
def gotFinalResult(x):
return 'final result is %s' % x
def result(res):
print res
def aBlockingMethod():
print 'going to sleep...'
time.sleep(10)
print 'woke up'
def main():
lc = LoopingCall(main_loop)
lc.start(2)
d = defer.Deferred()
d.addCallback(aBlockingRedisCall)
d.addCallback(result)
reactor.callInThread(d.callback, 1)
reactor.run()
if __name__=='__main__':
main()
Redis API가 그다지 복잡하지 않은 경우 많은 스레드에서 차단 API를 호출하는 대신 Twisted.Web을 사용하여 다시 작성하는 것이 더 자연 스러울 수 있습니다.
다른 팁
Twisted 용 최신 Redis 클라이언트도 이미 Redis 2.X의 새로운 프로토콜과 기능을 지원합니다. 당신은 결정적으로 시도해야합니다. TXREDISAPI라고합니다.
영구 메시지 대기열의 경우 restmq를 추천합니다. 사이클론과 Txredisapi 위에 구축 된 Redis 기반 메시지 큐 시스템.
http://github.com/gleicon/restmq
건배
관련 메모에서, 당신은 아마도 Twisted를 위해 특별히 만든 Redis 클라이언트를 사용하여 많은 것을 얻을 수 있습니다. http://github.com/deldotdr/txredis