Question

For performance reasons, I've got a denormalized database where some tables contain data which has been aggregated from many rows in other tables. I'd like to maintain this denormalized data cache by using SQLAlchemy events. As an example, suppose I was writing forum software and wanted each Thread to have a column tracking the combined word count of all comments in the thread in order to efficiently display that information:

class Thread(Base):
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    title = Column(UnicodeText(), nullable=False)
    word_count = Column(Integer, nullable=False, default=0)

class Comment(Base):
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    thread_id = Column(UUID, ForeignKey('thread.id', ondelete='CASCADE'), nullable=False)
    thread = relationship('Thread', backref='comments')
    message = Column(UnicodeText(), nullable=False)
    
    @property
    def word_count(self):
        return len(self.message.split())

So every time a comment is inserted (for the sake of simplicity let's say that comments are never edited or deleted), we want to update the word_count attribute on the associated Thread object. So I'd want to do something like

def after_insert(mapper, connection, target):
    thread = target.thread
    thread.word_count = sum(c.word_count for c in thread.comments)
    print("updated cached word count to", thread.word_count)

event.listen(Comment, "after_insert", after_insert)

So when I insert a Comment, I can see the event firing and see that it has correctly calculated the word count, but that change is not saved to the Thread row in the database. I don't see any caveats about updated other tables in the after_insert documentation, though I do see some caveats in some of the others, such as after_delete.

So is there a supported way to do this with SQLAlchemy events? I'm already using SQLAlchemy events for lots of other things, so I'd like to do everything that way instead of having to write database triggers.

Was it helpful?

Solution

the after_insert() event is one way to do this, and you might notice it is passed a SQLAlchemy Connection object, instead of a Session as is the case with other flush related events. The mapper-level flush events are intended to be used normally to invoke SQL directly on the given Connection:

@event.listens_for(Comment, "after_insert")
def after_insert(mapper, connection, target):
    thread_table = Thread.__table__
    thread = target.thread
    connection.execute(
            thread_table.update().
             where(thread_table.c.id==thread.id).
             values(word_count=sum(c.word_count for c in thread.comments))
    )
    print "updated cached word count to", thread.word_count

what is notable here is that invoking an UPDATE statement directly is also a lot more performant than running that attribute change through the whole unit of work process again.

However, an event like after_insert() isn't really needed here, as we know the value of "word_count" before the flush even happens. We actually know it as Comment and Thread objects are associated with each other, and we could just as well keep Thread.word_count completely fresh in memory at all times using attribute events:

def _word_count(msg):
    return len(msg.split())

@event.listens_for(Comment.message, "set")
def set(target, value, oldvalue, initiator):
    if target.thread is not None:
        target.thread.word_count += (_word_count(value) - _word_count(oldvalue))

@event.listens_for(Comment.thread, "set")
def set(target, value, oldvalue, initiator):
    # the new Thread, if any
    if value is not None:
        value.word_count += _word_count(target.message)

    # the old Thread, if any
    if oldvalue is not None:
        oldvalue.word_count -= _word_count(target.message)

the great advantage of this method is that there's also no need to iterate through thread.comments, which for an unloaded collection means another SELECT is emitted.

still another method is to do it in before_flush(). Below is a quick and dirty version, which can be refined to more carefully analyze what has changed in order to determine if the word_count needs to be updated or not:

@event.listens_for(Session, "before_flush")
def before_flush(session, flush_context, instances):
    for obj in session.new | session.dirty:
        if isinstance(obj, Thread):
            obj.word_count = sum(c.word_count for c in obj.comments)
        elif isinstance(obj, Comment):
            obj.thread.word_count = sum(c.word_count for c in obj.comments)

I'd go with the attribute event method as it is the most performant and up-to-date.

OTHER TIPS

You can do this with SQLAlchemy-Utils aggregated columns: http://sqlalchemy-utils.readthedocs.org/en/latest/aggregates.html

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top