長時間実行プロセスでのSQLAlchemyセッション管理
-
07-07-2019 - |
質問
シナリオ:
- .NETベースのアプリケーションサーバー( Wonderware IAS / System Platform )工場フロアのさまざまな機器と通信する自動化オブジェクトをホストします。
- CPythonはこのアプリケーションサーバー内でホストされます( Python for .NET を使用)。
- オートメーションオブジェクトには、スクリプト機能が組み込まれています(カスタムの.NETベースの言語を使用)。これらのスクリプトはPython関数を呼び出します。
Python関数は、工場で進行中の作業を追跡するシステムの一部です。システムの目的は、プロセスに沿って生成されたウィジェットを追跡し、ウィジェットが正しい順序でプロセスを通過することを確認し、プロセスに沿って特定の条件が満たされていることを確認することです。ウィジェットの生産履歴とウィジェットの状態はリレーショナルデータベースに保存されます。これはSQLAlchemyがその役割を果たす場所です。
たとえば、ウィジェットがスキャナーを通過すると、自動化ソフトウェアは次のスクリプトをトリガーします(アプリケーションサーバーのカスタムスクリプト言語で記述されています):
' wiget_id and scanner_id provided by automation object
' ExecFunction() takes care of calling a CPython function
retval = ExecFunction("WidgetScanned", widget_id, scanner_id);
' if the python function raises an Exception, ErrorOccured will be true
' in this case, any errors should cause the production line to stop.
if (retval.ErrorOccured) then
ProductionLine.Running = False;
InformationBoard.DisplayText = "ERROR: " + retval.Exception.Message;
InformationBoard.SoundAlarm = True
end if;
スクリプトは WidgetScanned
python関数を呼び出します:
# pywip/functions.py
from pywip.database import session
from pywip.model import Widget, WidgetHistoryItem
from pywip import validation, StatusMessage
from datetime import datetime
def WidgetScanned(widget_id, scanner_id):
widget = session.query(Widget).get(widget_id)
validation.validate_widget_passed_scanner(widget, scanner) # raises exception on error
widget.history.append(WidgetHistoryItem(timestamp=datetime.now(), action=u"SCANNED", scanner_id=scanner_id))
widget.last_scanner = scanner_id
widget.last_update = datetime.now()
return StatusMessage("OK")
# ... there are a dozen similar functions
質問:このシナリオでSQLAlchemyセッションを最適に管理するにはどうすればいいですか?アプリケーションサーバーは長時間実行されるプロセスであり、通常は再起動の間に数か月間実行されます。アプリケーションサーバーはシングルスレッドです。
現在、私は次のようにしています:
アプリケーションサーバーで使用可能にする関数にデコレータを適用します。
# pywip/iasfunctions.py
from pywip import functions
def ias_session_handling(func):
def _ias_session_handling(*args, **kwargs):
try:
retval = func(*args, **kwargs)
session.commit()
return retval
except:
session.rollback()
raise
return _ias_session_handling
# ... actually I populate this module with decorated versions of all the functions in pywip.functions dynamically
WidgetScanned = ias_session_handling(functions.WidgetScanned)
質問:上記のデコレータは、長時間実行されるプロセスでセッションを処理するのに適していますか? session.remove()
を呼び出す必要がありますか?
SQLAlchemyセッションオブジェクトはスコープセッションです:
# pywip/database.py
from sqlalchemy.orm import scoped_session, sessionmaker
session = scoped_session(sessionmaker())
セッション管理を基本機能から除外したい。 2つの理由:
- 別の関数ファミリ、シーケンス関数があります。シーケンス関数は、いくつかの基本的な関数を呼び出します。 1つのシーケンス関数は、1つのデータベーストランザクションと等しくなければなりません。
- 他の環境からライブラリを使用できる必要があります。 a)TurboGears Webアプリケーションから。その場合、セッション管理はTurboGearsによって行われます。 b)IPythonシェルから。その場合、コミット/ロールバックは明示的になります。
(長い質問を本当にすみません。しかし、シナリオを説明する必要があると感じました。おそらく必要ではないでしょうか?)
解決
説明されているデコレータは長時間実行されるアプリケーションに適していますが、リクエスト間で誤ってオブジェクトを共有すると問題が発生する可能性があります。エラーをより早く表示し、何も破損しないようにするには、session.remove()でセッションを破棄することをお勧めします。
try:
try:
retval = func(*args, **kwargs)
session.commit()
return retval
except:
session.rollback()
raise
finally:
session.remove()
または with
コンテキストマネージャーを使用できる場合:
try:
with session.registry().transaction:
return func(*args, **kwargs)
finally:
session.remove()
ところで、検証で古いデータが実行されないように、クエリで .with_lockmode( 'update')
を使用することもできます。
他のヒント
WonderWare管理者にWonderware Historianへのアクセスを許可してもらうと、sqlalchemyを介したMSSQL呼び出しを介してタグの値を簡単に追跡でき、頻繁にポーリングできます。
別のオプションは、archestraツールキットを使用して内部タグの更新をリッスンし、サーバーをgalaxyのプラットフォームとしてデプロイして、リッスンできるようにすることです。