Traditionally, forcing race conditions in multithreaded code is done with semaphores, so you can force a thread to wait until another thread has achieved some edge condition before continuing.
For example, your object has some code to check that start
is not called if the object is already running. You could force this condition to make sure it behaves as expected by doing something like this:
- starting a
KitchenTimer
- having the timer block on a semaphore while in the running state
- starting the same timer in another thread
- catching
AlreadyRunningError
To do some of this you may need to extend the KitchenTimer class. Formal unit tests will often use mock objects which are defined to block at critical times. Mock objects are a bigger topic than I can address here, but googling "python mock object" will turn up a lot of documentation and many implementations to choose from.
Here's a way that you could force your code to throw AlreadyRunningError
:
import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def start(self, duration=1, whenTimeUp=None):
KitchenTimer.start(self, duration, whenTimeUp)
with self._runningLock:
print "waiting on _runningLock"
self._runningLock.wait()
def resume(self):
with self._runningLock:
self._runningLock.notify()
timer = TestKitchenTimer()
# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()
# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
thread_2 = threading.Thread(target = timer.start, args = (10, None))
thread_2.start()
except AlreadyRunningError:
print "AlreadyRunningError"
timer.resume()
timer.stop()
Reading through the code, identify some of the boundary conditions you want to test, then think about where you would need to pause the timer to force that condition to arise, and add Conditions, Semaphores, Events, etc. to make it happen. e.g. what happens if, just as the timer runs the whenTimeUp callback, another thread tries to stop it? You can force that condition by making the timer wait as soon as it's entered _whenTimeUp:
import threading
class TestKitchenTimer(KitchenTimer):
_runningLock = threading.Condition()
def _whenTimeup(self):
with self._runningLock:
self._runningLock.wait()
KitchenTimer._whenTimeup(self)
def resume(self):
with self._runningLock:
self._runningLock.notify()
def TimeupCallback():
print "TimeupCallback was called"
timer = TestKitchenTimer()
# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)
# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()
# Now allow the countdown thread to resume.
timer.resume()
Subclassing the class you want to test isn't an awesome way to instrument it for testing: you'll have to override basically all of the methods in order to test race conditions in each one, and at that point there's a good argument to be made that you're not really testing the original code. Instead, you may find it cleaner to put the semaphores right in the KitchenTimer object but initialized to None by default, and have your methods check if testRunningLock is not None:
before acquiring or waiting on the lock. Then you can force races on the actual code that you're submitting.
Some reading on Python mock frameworks that may be helpful. In fact, I'm not sure that mocks would be helpful in testing this code: it's almost entirely self-contained and doesn't rely on many external objects. But mock tutorials sometimes touch on issues like these. I haven't used any of these, but the documentation on these like a good place to get started: