I have a SQLite3 table with a lot of lines that a lot of processes will try to consume. Each line may be consumed by only one process. To ensure the mutual exclusion, I gave each process its id and each DB line a column consumed_by
indicating which process used it.
So, my locking mechanism consists of starting a transaction, getting a line that consumed_by
is NULL, updating the column to the id of the process, and commiting the transaction. If two process tries to fetch the same line, the serializable principle will ensure that only one of them can commit the transaction.
Using pure sqlite, I managed to apply this locking concept, but with SQLAlchemy some lines were consumed by more than one process. This is how I have implemented my test:
I have the following class:
class ToConsume(Base):
__tablename__ = 'to_consume'
id = Column(Integer, primary_key=True)
consumed_by = Column(Integer,nullable=True)
I have the following function that is called by 100 different process:
def consume(id):
log = open("log/"+str(id),"w")
while True:
session = None
try:
session = Session()
consume = session.query(ToConsume).filter_by(consumed_by=None).first()
if consume is None:
break
consume.consumed_by = id
session.commit()
log.write(str(consume.id)+"\n")
log.flush()
except Exception:
if session is not None:
session.rollback()
log.close()
In the start of the file, I initialized SQLAlchemy the following way:
engine = sqlalchemy.create_engine('sqlite:///multithread.db', echo=False,connect_args={'timeout': 30})
def _sqlite_pragmas(dbapi_con, con_record):
dbapi_con.execute('PRAGMA synchronous = 0;')
sqlalchemy.event.listen(engine, 'connect', _sqlite_pragmas)
Base = sqlalchemy.ext.declarative.declarative_base()
Session = sqlalchemy.orm.sessionmaker(bind=engine)
My database starts with 1000 rows. Running 100 independents python process (I am not using threads), I expected that the concatenation of all log files would result in 1000 lines, but I get more than 3000 lines.
I am having a hard time trying to understand why my mutual exclusion does not work when using SQLAlchemy. Note that when I use purely sqlite the problem does not occur.