Based on the answer found in Timeout function using threading in python does not work. If you try it out on foo(x)
it does indeed stop counting unlike the my previous answer.
import multiprocessing as mp
import timeit
def timeout(func, args=(), kwargs=None, TIMEOUT=10, default=None, err=.05):
if hasattr(args, "__iter__") and not isinstance(args, basestring):
args = args
else:
args = [args]
kwargs = {} if kwargs is None else kwargs
pool = mp.Pool(processes = 1)
try:
result = pool.apply_async(func, args=args, kwds=kwargs)
val = result.get(timeout = TIMEOUT * (1 + err))
except mp.TimeoutError:
pool.terminate()
return default
else:
pool.close()
pool.join()
return val
def Timeit(command, setup=''):
return timeit.Timer(command, setup=setup).timeit(1)
def timeit_timeout(command, setup='', TIMEOUT=10, default=None, err=.05):
return timeout(Timeit, args=command, kwargs={'setup':setup},
TIMEOUT=TIMEOUT, default=default, err=err)