سؤال

Before I start describing my question, it might worth mentioning that I'm using Python 2.7. I haven't checked, but this might be irrelevant for Python 3.x.

While working with Python's Queues, I've discovered something strange. Usually, when getting an object from the Queue, I allow long but finite timeout (such as a few seconds), to allow debugging and error reporting in case no object was found, when one was expected. What I've found out is that sometimes there's a strange gap between the time when an object was inserted into a previously empty Queue, and the time the get method of the very same Queue has returned that object, even though the method was called before the put was called for that object.

Digging a little bit I've discovered that the gap was filled by sleeping. In the Queue module, if the timeout argument that is being passed to the get method is not None, and is positive, the non_empty Condition's wait method is called with a positive argument (that is not 100% precise; in fact, the Queue's "_qsize" method, which returns the length of the underlying deque is first verified to return 0, but as long as the queue was empty in the first place, the next thing is the condition's wait).

The Conditions's wait method acts differently if it gets a timeout or not. If it does not get any timeout, it simply calls waiter.acquire. This is defined in C and is beyond what I understand, but it seems like it works properly. However, if timeout is given, a bizarre sequence of sleeps occur instead, when the sleep times start at some arbitrary size (1 milisecond), and gets longer over time. Here's the exact code which runs:

# Balancing act:  We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive.  The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + timeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
    gotit = waiter.acquire(0)
    if gotit:
        break
    remaining = endtime - _time()
    if remaining <= 0:
        break
    delay = min(delay * 2, remaining, .05)
    _sleep(delay)

This is clearly the reason for the gap I've found between the time the new object was put into the previously-empty Queue, and the time that the already-called get method has returned that object. As the delay time grows exponentially until blocked by a huge (from my perspective) size of 0.05 seconds, it creates surprising and unwanted significant sleeps in my application's life.

Can you explain what's the purpose of this? Are Python developers assume no Python user will care about such time lengths? Is there a quick workaround or a proper fix? Do you recommend me to overload the threading module?

هل كانت مفيدة؟

المحلول

I recently got hit by the same problem, and I also tracked it down to this exact block of code in the threading module.

It sucks.


Can you explain what's the purpose of this? Are Python developers assume no Python user will care about such time lengths?

Beats me...


Do you recommend me to overload the threading module?

Either overload the threading module, or migrate to python3, where this part of the implementation has been fixed.

In my case, migrating to python3 would have been a huge effort, so I chose the former. What I did was:

  1. I created a quick .so file (using cython) with an interface to pthread. It includes python functions which invoke the corresponding pthread_mutex_* functions, and links against libpthread. Specifically, the function most relevant to the task we're interested in is pthread_mutex_timedlock.
  2. I created a new threading2 module, (and replaced all import threading lines in my codebase with import threading2). In threading2, I re-defined all the relevant classes from threading (Lock, Condition, Event), and also ones from Queue which I use a lot (Queue and PriorityQueue). The Lock class was completely re-implemented using pthread_mutex_* functions, but the rest were much easier -- I simply subclassed the original (e.g. threading.Event), and overridden __init__ to create my new Lock type. The rest just worked.

The implementation of the new Lock type was very similar to the original implementation in threading, but I based the new implemenation of acquire on the code I found in python3's threading module (which, naturally, is much simpler than the abovementioned "balancing act" block). This part was fairly easy.

(Btw, the result in my case was 30% speedup of my massively-multithreaded process. Even more than I expected.)

I hope this helps.

نصائح أخرى

What you could do to be sure that Queue is not doing something weird is to use the method get_nowait and the Exception Empty. Take a look at these lines that I have in our production servers. (Of course modified to fit in this example).

from Queue import Queue, Empty

while receiver.isAlive:
    try:
        rec = Record(queue.get_nowait())
    except Empty:
        # Set someTime with the value you want
        someTime = 0.1
        sleep(someTime)
    else:
        doSomething(rec)

Also, have in mind the following:

The time.sleep() function uses the underlying operating system’s sleep() function. Ultimately there are limitations of this function. For example on a standard Windows installation, the smallest interval you may sleep is 10 – 13 milliseconds. The Linux kernels tend to have a higher tick rate, where the intervals are generally closer to 1 millisecond.

مرخصة بموجب: CC-BY-SA مع الإسناد
لا تنتمي إلى StackOverflow
scroll top