Управление сеансом SQLAlchemy в длительном процессе
-
07-07-2019 - |
Вопрос
Сценарий:
- Сервер приложений на базе .NET (Wonderware IAS / Системная платформа) содержит объекты автоматизации, которые взаимодействуют с различным оборудованием на заводе.
- CPython размещен внутри этого сервера приложений (используя Python для .NET).
- Объекты автоматизации имеют встроенную функциональность написания сценариев (с использованием пользовательского языка на основе .NET).Эти скрипты вызывают функции Python.
Функции Python являются частью системы для отслеживания незавершенного производства на заводе.Цель системы - отслеживать созданные виджеты по ходу процесса, обеспечивать, чтобы виджеты проходили процесс в правильном порядке, и проверять, выполняются ли определенные условия по ходу процесса.История создания виджета и состояние виджета хранятся в реляционной базе данных, именно здесь SQLAlchemy играет свою роль.
Например, когда виджет проходит сканер, программное обеспечение автоматизации запускает следующий сценарий (написанный на пользовательском языке сценариев сервера приложений)::
' wiget_id and scanner_id provided by automation object
' ExecFunction() takes care of calling a CPython function
retval = ExecFunction("WidgetScanned", widget_id, scanner_id);
' if the python function raises an Exception, ErrorOccured will be true
' in this case, any errors should cause the production line to stop.
if (retval.ErrorOccured) then
ProductionLine.Running = False;
InformationBoard.DisplayText = "ERROR: " + retval.Exception.Message;
InformationBoard.SoundAlarm = True
end if;
Сценарий вызывает WidgetScanned
функция python:
# pywip/functions.py
from pywip.database import session
from pywip.model import Widget, WidgetHistoryItem
from pywip import validation, StatusMessage
from datetime import datetime
def WidgetScanned(widget_id, scanner_id):
widget = session.query(Widget).get(widget_id)
validation.validate_widget_passed_scanner(widget, scanner) # raises exception on error
widget.history.append(WidgetHistoryItem(timestamp=datetime.now(), action=u"SCANNED", scanner_id=scanner_id))
widget.last_scanner = scanner_id
widget.last_update = datetime.now()
return StatusMessage("OK")
# ... there are a dozen similar functions
Мой вопрос заключается в следующем: Как мне наилучшим образом управлять сеансами SQLAlchemy в этом сценарии? Сервер приложений - это длительный процесс, обычно между перезапусками проходят месяцы.Сервер приложений является однопоточным.
В настоящее время я делаю это следующим образом:
Я применяю декоратор к функциям, которые я делаю доступными для сервера приложений:
# pywip/iasfunctions.py
from pywip import functions
def ias_session_handling(func):
def _ias_session_handling(*args, **kwargs):
try:
retval = func(*args, **kwargs)
session.commit()
return retval
except:
session.rollback()
raise
return _ias_session_handling
# ... actually I populate this module with decorated versions of all the functions in pywip.functions dynamically
WidgetScanned = ias_session_handling(functions.WidgetScanned)
Вопрос: Подходит ли описанный выше декоратор для обработки сеансов в длительном процессе? Должен ли я позвонить session.remove()
?
Объект сеанса SQLAlchemy - это сеанс с ограниченной областью действия:
# pywip/database.py
from sqlalchemy.orm import scoped_session, sessionmaker
session = scoped_session(sessionmaker())
Я хочу исключить управление сеансами из основных функций.По двум причинам:
- Существует еще одно семейство функций, функции последовательности.Функции последовательности вызывают несколько базовых функций.Одна функция последовательности должна равняться одной транзакции базы данных.
- Мне нужно иметь возможность использовать библиотеку из других сред.а) Из веб-приложения TurboGears.В этом случае управление сеансом осуществляется TurboGears.б) Из оболочки IPython.В этом случае фиксация / откат будут явными.
(Я искренне извиняюсь за длинный вопрос.Но я чувствовал, что мне нужно объяснить сценарий.Может быть, в этом нет необходимости?)
Решение
Описанный декоратор подходит для длительно работающих приложений, но вы можете столкнуться с проблемами, если случайно предоставите общий доступ к объектам между запросами.Чтобы ошибки появлялись раньше и ничего не повреждали, лучше отменить сеанс с помощью session.remove().
try:
try:
retval = func(*args, **kwargs)
session.commit()
return retval
except:
session.rollback()
raise
finally:
session.remove()
Или, если вы можете использовать with
контекстный менеджер:
try:
with session.registry().transaction:
return func(*args, **kwargs)
finally:
session.remove()
Кстати, возможно, вы захотите использовать .with_lockmode('update')
в запросе, чтобы ваша проверка не выполнялась на устаревших данных.
Другие советы
Попросите своего администратора WonderWare предоставить вам доступ к Wonderware Historian, вы можете довольно легко отслеживать значения тегов с помощью вызовов MSSQL через sqlalchemy, которые вы можете периодически опрашивать.
Другой вариант - использовать archestra toolkit для прослушивания обновлений внутренних тегов и иметь сервер, развернутый в качестве платформы в Galaxy, с которого вы можете прослушивать.