Meitham


Self Ramblings

Stuff that matters to me


Timeout a Python Callable

written on Saturday, December 15, 2012

I have been working recently for a large media client in London, building libraries for them to be utilised by testers. One of the requirements they needed is to be able to execute a Python function within a time limit, i.e. the function should abort if it exceeds the given time limit.

Caveats

I looked around active state recipes for the most pythonic way of doing this and it turned out to be using threads. The problem though, threads don't always switch context back to the original caller, for example, threading Thread.join([timeout]) won't timeout if the thread is not letting go of the GIL. However, this is not a major issue in the libraries we have, so I went ahead with this.

Implementation

I have started with a slightly modified version of Aaron Swartz solution on AS but that was not enough. Basically, Aaron version lacked two features that I really needed:

  1. The ability to terminate the thread when it times out.
  2. The ability to get a full traceback from the line inside the thread where the exception has been raised.

It turned out that it was easy to implement both!

Raising an exception inside the thread will be handled by the thread context, and so will cause the thread to abort. So I've added the method below on the thread itself.

def abort(self):
    raise RuntimeError("Thread aborted as requested.")

Getting the full traceback from the thread comes in two steps. The first step is to capture the details inside the thread using sys.exc_info() which return a tuple of ExceptionClass, ExceptionObject, TracebackObject. The second step is to obviously raise this tuple inside the thread caller context.

class TimedThread(threading.Thread):
        """An abortable thread, by merly raising an exception inside its
        context.
        """
        def __init__(self):
                super(TimedThread, self).__init__()
                self.exc_info = (None, None, None)

        def run(self):
                self.started_at = datetime.now()
                try:
                        self.result = func(*args, **kwargs)
                except:
                        # save the exception as an object attribute
                        self.exc_info = sys.exc_info()
                        self.result = None
                        self.ended_at = datetime.now()

        def abort(self):
                self.ended_at = datetime.now()
                raise RuntimeError("Thread aborted as requested.")

Here is the full version of the code

from datetime import datetime
import sys
import threading
import time

class TimeoutError(Exception):
    """Shall be raised when exceeding allowed time limit
    """


def timed_run(func, args=(), kwargs={}, timeout=None):
    """Runs a function ``func`` and returns a result or raising TimeoutError if
    function does not complete within provided timeout, when timeout is not
    none.

    WARNING: This won't work on functions that don't release the GIL or don't
    return to the ceval loop.

    >>> def f(x=1):
    ...     time.sleep(5)
    ...     return x/1
    >>> timed_run(f)  # should not timeout
    1
    >>> timed_run(f, timeout=2)  # should timeout
    Traceback (most recent call last):
    ...
    RuntimeError: Thread aborted as requested.
    >>> def g(x=None):
    ...     print('hello')
    ...     f(None)
    >>> timed_run(g)  # should raise exception
    Traceback (most recent call last):
    ...
    TypeError: unsupported operand type(s) for /: 'NoneType' and 'int'
    """
    class TimedThread(threading.Thread):
        """An abortable thread, by merly raising an exception inside its
        context.
        """
        def __init__(self):
            super(TimedThread, self).__init__()
            self.exc_info = (None, None, None)

        def run(self):
            self.started_at = datetime.now()
            try:
                self.result = func(*args, **kwargs)
            except:
                # save the exception as an object attribute
                self.exc_info = sys.exc_info()
                self.result = None
            self.ended_at = datetime.now()

        def abort(self):
            self.ended_at = datetime.now()
            raise RuntimeError("Thread aborted as requested.")

    t = TimedThread()
    t.start()
    t.join(timeout)
    if t.exc_info[0] is not None:  # if there were any exceptions
        t, v, tb = t.exc_info
        raise t, v, tb  # Raise the exception/traceback inside the caller
    if t.is_alive():
        t.abort()
        diff = t.ended_at - t.started_at
        raise RuntimeError("%(f)s timed out after %(d)r seconds" %
                {'f': func, 'd': diff.seconds})
    return t.result

This entry was tagged python and threading