(我正在使用 py处理 在此示例中的模块,但是如果您运行,则用多处理替换处理可能应该可以工作 蟒蛇2.6 或使用 多处理向后移植)

我目前有一个程序可以监听unix套接字(使用processing.connection.Listener),接受连接并生成一个处理请求的线程。在某个时刻,我想优雅地退出进程,但由于accept()调用被阻塞,我看不出有什么办法可以很好地取消它。我至少有一种方法可以在这里(OS X)工作,设置一个信号处理程序并从另一个线程向该进程发出信号,如下所示:

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

我考虑过的唯一其他方法是深入连接(listener._listener._socket)并设置非阻塞选项......但这可能会产生一些副作用,并且通常非常可怕。

有谁有更优雅(甚至可能是正确的!)的方法来实现这一点?它需要可移植到 OS X、Linux 和 BSD,但 Windows 可移植性等不是必需的。

澄清:谢谢大家!像往常一样,我原来的问题中的歧义被揭示了:)

  • 我需要在取消监听后执行清理,并且我并不总是想实际退出该进程。
  • 我需要能够从不是从同一父进程生成的其他进程访问此进程,这使得队列变得笨拙
  • 线程的原因是:
    • 他们访问共享状态。实际上或多或少是一个常见的内存数据库,所以我想它可以以不同的方式完成。
    • 我必须能够同时接受多个连接,但实际线程大多数时候都在阻塞。每个接受的连接都会产生一个新线程;这是为了不阻止所有客户端的 I/O 操作。

关于线程对比进程,我使用线程使阻塞操作变为非阻塞,并使用进程来启用多处理。

有帮助吗?

解决方案

这不是 select 的用途吗?

仅当 select 表明它不会阻塞时才调用套接字上的 Accept...

选择有超时,因此您可以偶尔突破以检查 如果是时候关机了......

其他提示

我以为我可以避免它,但看来我必须做这样的事情:

from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()

import select

l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
  print "Accepting..."
  c = l.accept()
  # ...

我知道这违反了德米特定律并引入了一些邪恶的猴子补丁,但这似乎是实现此目的最容易移植的方法。如果有人有更优雅的解决方案,我会很高兴听到:)

我是多处理模块的新手,但在我看来,混合处理模块和线程模块是违反直觉的,它们不是旨在解决同样的问题吗?

不管怎样,将你的监听函数包装到一个进程本身怎么样?我不清楚这如何影响代码的其余部分,但这可能是一个更干净的替代方案。

from multiprocessing import Process
from multiprocessing.connection import Listener


class ListenForConn(Process):

    def run(self):
        listener = Listener('/tmp/asdf', 'AF_UNIX')
        listener.accept()

        # do your other handling here


listen_process = ListenForConn()
listen_process.start()

print listen_process.is_alive()

listen_process.terminate()
listen_process.join()

print listen_process.is_alive()
print 'No more listen process.'

可能并不理想,但您可以通过从信号处理程序或终止进程的线程向套接字发送一些数据来释放该块。

编辑:实现这一点的另一种方法可能是使用 联系 队列, ,因为它们似乎支持超时(抱歉,我在第一次阅读时误读了您的代码)。

我遇到了同样的问题。我通过向侦听器发送“停止”命令解决了这个问题。在侦听器的主线程(处理传入消息的线程)中,每次收到新消息时,我只需检查它是否是“停止”命令并退出主线程。

这是我正在使用的代码:

def start(self):
    """
    Start listening
    """
    # set the command being executed
    self.command = self.COMMAND_RUN

    # startup the 'listener_main' method as a daemon thread
    self.listener = Listener(address=self.address, authkey=self.authkey)
    self._thread = threading.Thread(target=self.listener_main, daemon=True)
    self._thread.start()

def listener_main(self):
    """
    The main application loop
    """

    while self.command == self.COMMAND_RUN:
        # block until a client connection is recieved
        with self.listener.accept() as conn:

            # receive the subscription request from the client
            message = conn.recv()

            # if it's a shut down command, return to stop this thread
            if isinstance(message, str) and message == self.COMMAND_STOP:
                return

            # process the message

def stop(self):
    """
    Stops the listening thread
    """
    self.command = self.COMMAND_STOP
    client = Client(self.address, authkey=self.authkey)
    client.send(self.COMMAND_STOP)
    client.close()

    self._thread.join()

我使用身份验证密钥来防止黑客通过从任意客户端发送停止命令来关闭我的服务。

我的不是一个完美的解决方案。似乎更好的解决方案可能是修改代码 multiprocessing.connection.Listener, ,并添加一个 stop() 方法。但是,这需要将其发送给 Python 团队批准。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top