Question

What event system for Python do you use? I'm already aware of pydispatcher, but I was wondering what else can be found, or is commonly used?

I'm not interested in event managers that are part of large frameworks, I'd rather use a small bare-bones solution that I can easily extend.

Was it helpful?

Solution

Wrapping up the various event systems that are mentioned in the answers here:

The most basic style of event system is the 'bag of handler methods', which is a simple implementation of the Observer pattern. Basically, the handler methods (callables) are stored in an array and are each called when the event 'fires'.

  • zope.event shows the bare bones of how this works (see Lennart's answer). Note: this example does not even support handler arguments.
  • LongPoke's 'callable list' implementation shows that such an event system can be implemented very minimalistically by subclassing list.
  • spassig's EventHook (Michael Foord's Event Pattern) is a straightforward implementation.
  • Josip's Valued Lessons Event class is basically the same, but uses a set instead of a list to store the bag, and implements __call__ which are both reasonable additions.
  • PyNotify is similar in concept and also provides additional concepts of variables and conditions ('variable changed event').
  • axel is basically a bag-of-handlers with more features related to threading, error handling, ...

The disadvantage of these event systems is that you can only register the handlers on the actual Event object (or handlers list). So at registration time the event already needs to exist.

That's why the second style of event systems exists: the publish-subscribe pattern. Here, the handlers don't register on an event object (or handler list), but on a central dispatcher. Also the notifiers only talk to the dispatcher. What to listen for, or what to publish is determined by 'signal', which is nothing more than a name (string).

  • blinker has some nifty features such as automatic disconnection and filtering based on sender.
  • PyPubSub at first sight seems to be pretty straightforward.
  • PyDispatcher seems to emphasize flexibility with regards to many-to-many publication etc.
  • louie is a reworked PyDispatcher "providing plugin infrastructure including Twisted and PyQt specific support". It seems to have lost maintenance after January 2016.
  • django.dispatch is a rewritten PyDispatcher "with a more limited interface, but higher performance".
  • Qt's Signals and Slots are available from PyQt or PySide. They work as callback when used in the same thread, or as events (using an event loop) between two different threads. Signals and Slots have the limitation that they only work in objects of classes that derive from QObject.

Note: threading.Event is not an 'event system' in the above sense. It's a thread synchronization system where one thread waits until another thread 'signals' the Event object.

Note: not yet included above are pypydispatcher, python-dispatch and the 'hook system' of pluggy might be of interest as well.

OTHER TIPS

I've been doing it this way:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

However, like with everything else I've seen, there is no auto generated pydoc for this, and no signatures, which really sucks.

We use an EventHook as suggested from Michael Foord in his Event Pattern:

Just add EventHooks to your classes with:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

We add the functionality to remove all listener from an object to Michaels class and ended up with this:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler

I use zope.event. It's the most bare bones you can imagine. :-) In fact, here is the complete source code:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Note that you can't send messages between processes, for example. It's not a messaging system, just an event system, nothing more, nothing less.

I found this small script on Valued Lessons. It seems to have just the right simplicity/power ratio I'm after. Peter Thatcher is the author of following code (no licensing is mentioned).

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

You may have a look at pymitter (pypi). Its a small single-file (~250 loc) approach "providing namespaces, wildcards and TTL".

Here's a basic example:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

I created an EventManager class (code at the end). The syntax is the following:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

Here is an Example:

def hello(name):
    print "Hello {}".format(name)

def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Output:

Initial salute
Greetings Oscar
Hello Oscar

Now remove greetings
Hello Oscar

EventManger Code:

class EventManager:

    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions

        def __iadd__(self,func):
            self.functions.append(func)
            return self

        def __isub__(self,func):
            self.functions.remove(func)
            return self

        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)

    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.

        Example:

        def hello(): print "Hello ",
        def world(): print "World"

        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world

        EventManager.salute()

        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])

        cls.__dict__.update(kvargs)

Here is a minimal design that should work fine. What you have to do is to simply inherit Observer in a class and afterwards use observe(event_name, callback_fn) to listen for a specific event. Whenever that specific event is fired anywhere in the code (ie. Event('USB connected')), the corresponding callback will fire.

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Example:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')

I made a variation of Longpoke's minimalistic approach that also ensures the signatures for both callees and callers:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()

If I do code in pyQt I use QT sockets/signals paradigm, same is for django

If I'm doing async I/O I use native select module

If I'm usign a SAX python parser I'm using event API provided by SAX. So it looks like I'm victim of underlying API :-)

Maybe you should ask yourself what do you expect from event framework/module. My personal preference is to use Socket/Signal paradigm from QT. more info about that can be found here

Here's another module for consideration. It seems a viable choice for more demanding applications.

Py-notify is a Python package providing tools for implementing Observer programming pattern. These tools include signals, conditions and variables.

Signals are lists of handlers that are called when signal is emitted. Conditions are basically boolean variables coupled with a signal that is emitted when condition state changes. They can be combined using standard logical operators (not, and, etc.) into compound conditions. Variables, unlike conditions, can hold any Python object, not just booleans, but they cannot be combined.

If you wanted to do more complicated things like merging events or retry you can use the Observable pattern and a mature library that implements that. https://github.com/ReactiveX/RxPY . Observables are very common in Javascript and Java and very convenient to use for some async tasks.

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

OUTPUT:

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

You can try buslane module.

This library makes implementation of message-based system easier. It supports commands (single handler) and events (0 or multiple handlers) approach. Buslane uses Python type annotations to properly register handler.

Simple example:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='john@lennon.com',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='john@lennon.com',
    password='secret',
))

To install buslane, simply use pip:

$ pip install buslane

Some time ago I've wrote library that might be useful for you. It allows you to have local and global listeners, multiple different ways of registering them, execution priority and so on.

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

Have a look pyeventdispatcher

If you need an eventbus that works across process or network boundaries you can try PyMQ. It currently supports pub/sub, message queues and synchronous RPC. The default version works on top of a Redis backend, so you need a running Redis server. There is also an in-memory backend for testing. You can also write your own backend.

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

To initialize the system:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

Disclaimer: I am the author of this library

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top