Question

Consider a system where I have events coming in at unpredictable points in time. I want to be able to perform a "deferred" action that executes a fixed amount of time, X units, after the last event has come in. An event is considered "last" if it is the only event to have occurred in the last X units of time. What is the most efficient way to do this in Python?

One solution I have considered is using a threading.Event:

# This solution has the drawback that the deferred event may actually occur
# up to 2*X units of time after the last event.
# Also, it kinda sucks that the thread is basically polling once the first
# event comes in.


from threading import Thread
from threading import Event
import time
import sys


evt = Event()
die = False
X = 1


def thread_func_event():
    while True:
        evt.wait()
        if die:
            break

        while True:
            evt.clear()
            time.sleep(X)

            if not evt.is_set():
                # No more events came in. Good.
                break
            # Looks like more events came in. Let's try again.

            if die:
                return

        print('Deferred action performed.')
        sys.stdout.flush()


def event_occurred():
    evt.set()


t = Thread(target=thread_func_event)
t.start()

for _ in range(0, 1000000):
    event_occurred()
print('First batch of events done.')
sys.stdout.flush()

time.sleep(3)

for _ in range(0, 1000000):
    event_occurred()
print('Second batch of events done.')
sys.stdout.flush()

time.sleep(3)
die = True
evt.set()
t.join()
Was it helpful?

Solution

I've done something like this before.

import threading
import time

class waiter(object):
    def __init__(self, action, delay = 0.5, *args, **kwargs):
        self.action_lockout_timeout = threading.Thread()
        self.action_lockout_event   = threading.Event()
        self.action                 = action
        self.delay                  = delay
        self.action_prevent()

    def action_prevent(self):
        def action_enable():
            self.action_lockout_event.wait(self.delay)
            if not self.action_lockout_event._Event__flag:
                self.action()

        if self.action_lockout_timeout.isAlive():
            self.action_lockout_event.set()
            self.action_lockout_timeout.join()
        self.action_lockout_event.clear()
        self.action_lockout_timeout = threading.Thread(target = action_enable)
        self.action_lockout_timeout.setDaemon(True)
        self.action_lockout_timeout.start()

def thanks():
    print("Person 2: Thank you ...")

polite = waiter(thanks, 3)
print("Person 1: After you")
polite.action_prevent()
time.sleep(2)
print("Person 2: No, after you")
polite.action_prevent()
time.sleep(2)
print("Person 1: No I insist")
polite.action_prevent()
time.sleep(2)
print("Person 2: But it would be rude")
polite.action_prevent()
time.sleep(2)
print("---Akward Silence---")
time.sleep(2)

If you want to run a function with arguments, just wrap it with a lambda expression.

def thanks(person):
    print("%s: Thank you ..." % person)

polite = waiter(lambda: thanks("Person 2"), 3)

EDIT:

Turns out that threading.Event is pretty slow. Here's a solution that replaces the Event with time.sleep and a bool. It also uses __slots__ to speed up attribute accesses

import sys
import threading
import time

class waiter(object):
    __slots__ = \
    [
        "action",
        "delay",
        "undelayed",
        "delay_timeout",
    ]
    def __init__(self, action, delay = 0.5, *args, **kwargs):
        self.action        = action
        self.delay         = delay
        self.undelayed     = False
        self.delay_timeout = threading.Thread(target = self.action_enable)
        self.delay_timeout.start()

    def action_prevent(self):
        self.undelayed = False

    def action_enable(self):
        while True:
            time.sleep(self.delay)
            if self.undelayed:
                self.action()
                break
            else:
                self.undelayed = True

def thanks():
    print("Person 2: Thank you ...")

polite = waiter(thanks, 1)
for _ in range(0, 1000000):
    polite.action_prevent()

print("First batch of events done.")
time.sleep(2)
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top