Question

I have a SQLite3 table with a lot of lines that a lot of processes will try to consume. Each line may be consumed by only one process. To ensure the mutual exclusion, I gave each process its id and each DB line a column consumed_by indicating which process used it.

So, my locking mechanism consists of starting a transaction, getting a line that consumed_by is NULL, updating the column to the id of the process, and commiting the transaction. If two process tries to fetch the same line, the serializable principle will ensure that only one of them can commit the transaction.

Using pure sqlite, I managed to apply this locking concept, but with SQLAlchemy some lines were consumed by more than one process. This is how I have implemented my test:

I have the following class:

class ToConsume(Base):
    __tablename__ = 'to_consume'
    id = Column(Integer, primary_key=True)
    consumed_by = Column(Integer,nullable=True)

I have the following function that is called by 100 different process:

def consume(id):
    log = open("log/"+str(id),"w")
    while True:
        session = None
        try:
            session = Session()
            consume = session.query(ToConsume).filter_by(consumed_by=None).first()
            if consume is None:
                break
            consume.consumed_by = id
            session.commit()
            log.write(str(consume.id)+"\n")
            log.flush()
        except Exception:
            if session is not None:
                session.rollback()
    log.close()

In the start of the file, I initialized SQLAlchemy the following way:

engine = sqlalchemy.create_engine('sqlite:///multithread.db', echo=False,connect_args={'timeout': 30})
def _sqlite_pragmas(dbapi_con, con_record):
    dbapi_con.execute('PRAGMA synchronous = 0;')
sqlalchemy.event.listen(engine, 'connect', _sqlite_pragmas)

Base = sqlalchemy.ext.declarative.declarative_base()
Session = sqlalchemy.orm.sessionmaker(bind=engine)

My database starts with 1000 rows. Running 100 independents python process (I am not using threads), I expected that the concatenation of all log files would result in 1000 lines, but I get more than 3000 lines.

I am having a hard time trying to understand why my mutual exclusion does not work when using SQLAlchemy. Note that when I use purely sqlite the problem does not occur.

Was it helpful?

Solution

SQlite is not set to be serializable by default. Additionally, there are bugs in the Python sqlite3 driver (see http://bugs.python.org/issue9924 for background). To use serializable isolation with sqlite3/sqlalchemy, see this document. Example:

from sqlalchemy import create_engine, event

engine = create_engine("sqlite:///myfile.db", isolation_level='SERIALIZABLE')

@event.listens_for(engine, "begin")
def do_begin(conn):
    conn.execute("BEGIN")
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top