受け入れをキャンセルして Python 処理/マルチプロセッシング リスナー接続を閉じる適切な方法
-
21-08-2019 - |
質問
(私が使っているのは、 pyprocessing この例では module を使用していますが、processing を multiprocessing に置き換えて実行するとおそらく機能するはずです。 Python2.6 または、 マルチプロセッシングバックポート)
現在、UNIXソケットをリッスンし(processing.connection.Listenerを使用)、接続を受け入れ、リクエストを処理するスレッドを生成するプログラムがあります。ある時点でプロセスを正常に終了したいのですが、accept() 呼び出しがブロックされているため、適切にキャンセルする方法が見つかりません。少なくともここ(OS X)で機能する方法が1つあり、次のようにシグナルハンドラーを設定し、別のスレッドからプロセスにシグナルを送ります。
import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno
# This is actually called by the connection handler.
def closeme():
time.sleep(1)
print 'Closing socket...'
listener.close()
os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)
oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)
listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
listener.accept()
except socket.error, e:
if e.args[0] != errno.EINTR:
raise
# Cleanup here...
print 'Done...'
私が考えた他の唯一の方法は、接続 (listener._listener._socket) の奥深くまで到達して、非ブロッキング オプションを設定することです...しかし、これにはおそらくいくつかの副作用があり、一般的に非常に怖いです。
これを達成するためのよりエレガントな (そしておそらく正しい!) 方法を持っている人はいますか?OS X、Linux、BSD に移植できる必要がありますが、Windows の移植性などは必要ありません。
説明:皆さんありがとう!いつものように、私の元の質問のあいまいさが明らかになりました:)
- リスニングをキャンセルした後にクリーンアップを実行する必要がありますが、実際にそのプロセスを終了したいとは限りません。
- 同じ親から生成されていない他のプロセスからこのプロセスにアクセスできる必要があるため、キューが扱いにくくなります
- スレッドが発生する理由は次のとおりです。
- 彼らは共有状態にアクセスします。実際には多かれ少なかれ一般的なインメモリデータベースなので、別の方法で実行できると思います。
- 複数の接続を同時に受け入れられるようにする必要がありますが、実際のスレッドはほとんどの場合、何らかの理由でブロックされています。受け入れられた接続ごとに新しいスレッドが生成されます。これは、I/O 操作ですべてのクライアントをブロックしないようにするためです。
スレッドと対についてプロセスでは、ブロッキング操作を非ブロッキングにするためにスレッドを使用し、マルチプロセッシングを有効にするためにプロセスを使用します。
解決
イマイチ何を選択することは??のためのものであること。
のみ...選択は、それがブロックされませんを示している場合、ソケット上で受け入れを呼び出す
を選択し、タイムアウトを持っているので、あなたがチェックするために時々抜け出すことができます その時間がシャットダウンする場合は....
他のヒント
私は、私はそれを避けることができると思ったが、私がこのような何かをしなければならないようです。
from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()
import select
l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
print "Accepting..."
c = l.accept()
# ...
私は、これがデメテルの法則を破ると、いくつかの邪悪な猿のパッチを導入していることを認識していますが、これはこれを達成する最も簡単にポート方法だろうと思われます。誰もがよりエレガントな解決策を持っている場合、私はそれを聞いて幸せになる:)
私はマルチプロセッシングモジュールに新しいですが、処理モジュールとthreadingモジュールを混合することは直感的であるように私には思える、彼らは同じ問題を解決をターゲットされていませんか?
とにかく、どのようにプロセス自体にあなたの耳を傾けた機能を包むでしょうか?私は、これはあなたのコードの残りの部分にどのように影響するかは明らかではないんだけど、これはクリーンな代替かもしれません。
from multiprocessing import Process
from multiprocessing.connection import Listener
class ListenForConn(Process):
def run(self):
listener = Listener('/tmp/asdf', 'AF_UNIX')
listener.accept()
# do your other handling here
listen_process = ListenForConn()
listen_process.start()
print listen_process.is_alive()
listen_process.terminate()
listen_process.join()
print listen_process.is_alive()
print 'No more listen process.'
おそらく理想的ではありませんが、あなたはプロセスを終了されたシグナルハンドラまたはスレッドからソケットにいくつかのデータを送信することによって、ブロックを解放することができます。
編集:これは<ストライキ>接続ストライキ> <のhref = "http://docs.python.org/dev/library/multiprocessing.html#pipes-and-を使用するのが良いかもしれない実装するための別の方法キュー」のrel = 『nofollowをnoreferrer』>キューを、彼らはタイムアウトをサポートするように見えるので、(謝罪を、私は私の最初の読み取りでコードを読み違える)。
私は、同じ問題に遭遇しました。私は、リスナーに「停止」コマンドを送信することによってそれを解決しました。リスナーのメインスレッド(受信メッセージを処理し1)、新しいメッセージが受信されるたびに、私はちょうどそれがメインスレッドのうち、「停止」コマンドと出るかどうかを確認します。
ここで私が使用しているコードです
def start(self):
"""
Start listening
"""
# set the command being executed
self.command = self.COMMAND_RUN
# startup the 'listener_main' method as a daemon thread
self.listener = Listener(address=self.address, authkey=self.authkey)
self._thread = threading.Thread(target=self.listener_main, daemon=True)
self._thread.start()
def listener_main(self):
"""
The main application loop
"""
while self.command == self.COMMAND_RUN:
# block until a client connection is recieved
with self.listener.accept() as conn:
# receive the subscription request from the client
message = conn.recv()
# if it's a shut down command, return to stop this thread
if isinstance(message, str) and message == self.COMMAND_STOP:
return
# process the message
def stop(self):
"""
Stops the listening thread
"""
self.command = self.COMMAND_STOP
client = Client(self.address, authkey=self.authkey)
client.send(self.COMMAND_STOP)
client.close()
self._thread.join()
私は、任意のクライアントからの停止コマンドを送信することによって、自分のサービスをシャットダウンからハッカーになる防ぐために認証キーを使用しています。
鉱山は完璧なソリューションではありません。よりよい解決策がmultiprocessing.connection.Listener
内のコードを修正し、stop()
メソッドを追加するかもしれないようです。しかし、それはPythonのチームによる承認のプロセスを通してそれを送ることが必要になります。