Pergunta

É possível encerrar um segmento em execução sem configuração / verificar qualquer bandeiras / semáforos / etc.?

Foi útil?

Solução

Em geral, é um padrão ruim para matar um fio de forma abrupta, em Python e em qualquer idioma. Pense nos seguintes casos:

  • o thread está mantendo um recurso crítico que deve ser fechado corretamente
  • o segmento criou vários outros tópicos que devem ser mortos também.

A boa maneira de lidar com isso se você puder pagá-lo (se você está administrando suas próprias threads) é ter uma bandeira exit_request que cada fios controlos no intervalo regular para ver se é tempo para ele sair.

Por exemplo:

import threading

class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self):
        super(StoppableThread, self).__init__()
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def stopped(self):
        return self._stop_event.is_set()

Neste código, você deve chamar stop() no segmento quando você quer que ele saia, e esperar para o segmento para sair corretamente usando join(). O fio deve verificar a bandeira parada em intervalos regulares.

Há casos, no entanto, quando você realmente precisa matar um fio. Um exemplo é quando você está enrolando uma biblioteca externa que está ocupado para chamadas de longa e você quer interrompê-lo.

O código a seguir permite que (com algumas restrições) para levantar uma exceção em um segmento de Python:

def _async_raise(tid, exctype):
    '''Raises an exception in the threads with id tid'''
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
                                                     ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # "if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"
        ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class ThreadWithExc(threading.Thread):
    '''A thread class that supports raising exception in the thread from
       another thread.
    '''
    def _get_my_tid(self):
        """determines this (self's) thread id

        CAREFUL : this function is executed in the context of the caller
        thread, to get the identity of the thread represented by this
        instance.
        """
        if not self.isAlive():
            raise threading.ThreadError("the thread is not active")

        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id

        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid

        # TODO: in python 2.6, there's a simpler way to do : self.ident

        raise AssertionError("could not determine the thread's id")

    def raiseExc(self, exctype):
        """Raises the given exception type in the context of this thread.

        If the thread is busy in a system call (time.sleep(),
        socket.accept(), ...), the exception is simply ignored.

        If you are sure that your exception should terminate the thread,
        one way to ensure that it works is:

            t = ThreadWithExc( ... )
            ...
            t.raiseExc( SomeException )
            while t.isAlive():
                time.sleep( 0.1 )
                t.raiseExc( SomeException )

        If the exception is to be caught by the thread, you need a way to
        check that your thread has caught it.

        CAREFUL : this function is executed in the context of the
        caller thread, to raise an excpetion in the context of the
        thread represented by this instance.
        """
        _async_raise( self._get_my_tid(), exctype )

(Baseado em Killalble Threads por Tomer Filiba. A citação sobre o valor de retorno aparece PyThreadState_SetAsyncExc ser de uma versão antiga de Python ).

Como observado na documentação, isto não é uma bala mágica, porque se o segmento está ocupado fora do interpretador Python, ele não vai pegar a interrupção.

Um padrão de uso bom desse código é ter a captura rosca uma exceção específica e executar a limpeza. Dessa forma, você pode interromper uma tarefa e ainda ter a limpeza adequada.

Outras dicas

Não há nenhuma API oficial para fazer isso, não.

Você precisa usar a plataforma API para matar o fio, por exemplo, pthread_kill, ou TerminateThread. Você pode acessar tal API por exemplo através PythonWin, ou através ctypes.

Observe que este é inerentemente inseguro. Ele provavelmente vai levar a lixo incobráveis ??(de variáveis ??locais dos quadros de pilha que se tornam lixo), e pode levar a impasses, se o segmento de ser morto tem a GIL no momento em que ele está morto.

A multiprocessing.Process pode p.terminate()

Nos casos onde eu quero matar um segmento, mas não quer usar sinalizadores / travas / sinais / semáforos / eventos / whatever, eu promover os fios aos processos desenvolvido. Para código que faz uso de apenas alguns tópicos a sobrecarga não é tão ruim.

por exemplo. Isto vem a calhar para terminar facilmente auxiliares "threads" que executam o bloqueio I / O

A conversão é trivial: No código relacionado substituir todos threading.Thread com multiprocessing.Process e todos queue.Queue com multiprocessing.Queue e adicionar as chamadas necessárias de p.terminate() ao seu processo pai que quer matar sua p criança

Python doc

Se você está tentando terminar todo o programa você pode definir o segmento como um "daemon". Vejo Thread.daemon

Isto é baseado em thread2 - tópicos killable (Python receita)

Você precisa chamar PyThreadState_SetasyncExc (), que só está disponível através ctypes.

Esta só foi testado em Python 2.7.3, mas é provável que o trabalho com outras versões 2.x recentes.

import ctypes

def terminate_thread(thread):
    """Terminates a python thread from another thread.

    :param thread: a threading.Thread instance
    """
    if not thread.isAlive():
        return

    exc = ctypes.py_object(SystemExit)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(thread.ident), exc)
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")

Você nunca deve forçosamente matar uma thread sem cooperar com ele.

Matar um fio remove quaisquer garantias de que try / finally blocos configurados de modo que você pode deixar fechaduras bloqueado, arquivos abertos, etc.

A única vez que você pode argumentar que tópicos força matando é uma boa idéia é matar um programa rápido, mas nunca tópicos individuais.

Como já foi mencionado, a norma é definir um sinalizador de parada. Para leve algo (sem subclasses de Thread, nenhuma variável global), uma chamada de retorno lambda é uma opção. (Observe os parênteses em if stop().)

import threading
import time

def do_work(id, stop):
    print("I am thread", id)
    while True:
        print("I am thread {} doing something".format(id))
        if stop():
            print("  Exiting loop.")
            break
    print("Thread {}, signing off".format(id))


def main():
    stop_threads = False
    workers = []
    for id in range(0,3):
        tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
        workers.append(tmp)
        tmp.start()
    time.sleep(3)
    print('main: done sleeping; time to stop the threads.')
    stop_threads = True
    for worker in workers:
        worker.join()
    print('Finis.')

if __name__ == '__main__':
    main()

Substituir print() com uma função pr() que sempre libera (sys.stdout.flush()) pode melhorar a precisão da saída de shell.

(Apenas testado em Windows / Eclipse / Python3.3)

Em Python, você simplesmente não pode matar uma thread diretamente.

Se você realmente não precisa ter uma Thread (!), O que pode fazer, em vez de usar o rosqueamento pacote , é usar o multiprocessamento pacote . Aqui, para matar um processo, você pode simplesmente chamar o método:

yourProcess.terminate()  # kill the process!

Python vai matar o seu processo (no Unix através do sinal SIGTERM, enquanto no Windows por meio da chamada TerminateProcess()). Preste atenção para usá-lo enquanto estiver usando uma fila ou cachimbo! (Isso pode corromper os dados da Fila / Pipe)

Note que o multiprocessing.Event eo trabalho multiprocessing.Semaphore exatamente da mesma maneira do threading.Event eo threading.Semaphore respectivamente. Na verdade, os primeiros são clones dos dois Estados.

Se você realmente precisa usar um fio, não há nenhuma maneira de matá-lo diretamente. O que você pode fazer, no entanto, é usar um "thread daemon" . De fato, em Python, um segmento pode ser marcada como daemon :

yourThread.daemon = True  # set the Thread as a "daemon thread"

O programa principal vai sair quando há segmentos não-daemon vivas são deixados. Em outras palavras, quando seu segmento principal (que é, naturalmente, um segmento não-daemon) vai terminar as suas operações, o programa sairá mesmo que ainda existem alguns tópicos daemon de trabalho.

Note que é necessário definir uma rosca como daemon antes do método start() é chamado!

Claro que você pode, e deve, uso daemon mesmo com multiprocessing. Aqui, quando as principais saídas de processo, ele tenta encerrar todos os seus processos filhos demoníacos.

Finalmente, por favor, nota que sys.exit() e os.kill() não são escolhas.

Você pode matar um fio através da instalação de traço para o segmento que vai sair o fio. Veja o link anexado para uma possível implementação.

Matar um segmento em Python

É melhor se você não matar um fio. Uma maneira poderia ser a introdução de um bloco "try" no ciclo do segmento e para lançar uma exceção quando você quer parar a thread (por exemplo, uma pausa / retorno / ... que pára o seu para / while / ...). Eu usei isso no meu aplicativo e ele funciona ...

É definitivamente possível implementar um método Thread.stop como mostrado no seguinte exemplo de código:

import sys
import threading
import time


class StopThread(StopIteration):
    pass

threading.SystemExit = SystemExit, StopThread


class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Cannot run thread with tracing!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace


class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

###############################################################################


def main():
    test1 = Thread2(target=printer)
    test1.start()
    time.sleep(1)
    test1.stop()
    test1.join()
    test2 = Thread2(target=speed_test)
    test2.start()
    time.sleep(1)
    test2.stop()
    test2.join()
    test3 = Thread3(target=speed_test)
    test3.start()
    time.sleep(1)
    test3.stop()
    test3.join()


def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)


def speed_test(count=0):
    try:
        while True:
            count += 1
    except StopThread:
        print('Count =', count)

if __name__ == '__main__':
    main()

A classe Thread3 aparece para executar código aproximadamente 33% mais rápido do que a classe Thread2.

from ctypes import *
pthread = cdll.LoadLibrary("libpthread-2.15.so")
pthread.pthread_cancel(c_ulong(t.ident))

t é o objeto Thread.

Leia a fonte python (Modules/threadmodule.c e Python/thread_pthread.h) você pode ver a Thread.ident é um tipo pthread_t, para que você possa fazer qualquer coisa pthread pode fazer em uso python libpthread.

A seguir solução alternativa pode ser usado para matar um fio:

kill_threads = False

def doSomething():
    global kill_threads
    while True:
        if kill_threads:
            thread.exit()
        ......
        ......

thread.start_new_thread(doSomething, ())

Isto pode ser usado até mesmo para tópicos de terminação, cujo código é escrito em outro módulo, de thread principal. Podemos declarar uma variável global em que o módulo e usá-lo para terminar thread / s gerou nesse módulo.

Eu costumo usar isso para encerrar todos os fios na saída do programa. Isto pode não ser a maneira perfeita de terminar thread / s, mas poderia ajudar.

Uma coisa que eu quero acrescentar é que se você ler a documentação oficial em rosqueamento lib Python , é recomendado para uso evitar de threads "demoníacas", quando você não quer tópicos terminar abruptamente, com a bandeira que Paolo Rovelli mencionado .

De documentação oficial:

tópicos Daemon são abruptamente interrompido no desligamento. Seus recursos (como arquivos abertos, transações de banco de dados, etc.) não pode ser liberada corretamente. Se você quer que seus tópicos para parar graciosamente, torná-los não-demoníaca e usar um mecanismo de sinalização adequada, tal como um evento.

Eu acho que a criação de tópicos demoníacos depende de sua aplicação, mas em geral (e na minha opinião) é melhor evitar matá-los ou fazê-los demoníaca. Em multiprocessamento você pode usar is_alive() para verificar o status do processo e "encerrar" para concluir-los (Também a evitar problemas GIL). Mas você pode encontrar mais problemas, às vezes, quando você executar o seu código no Windows.

E lembre-se sempre que se você tem "threads ao vivo", o interpretador Python vai estar em execução para esperar deles. (Devido a isso pode ajudar daemonic você, se não importam termina abruptamente).

Se você está chamando explicitamente time.sleep() como parte de seu segmento (polling digamos algum serviço externo), uma melhoria sobre o método de Phillipe é usar o tempo limite no método event do wait() onde quer que sleep()

Por exemplo:

import threading

class KillableThread(threading.Thread):
    def __init__(self, sleep_interval=1):
        super().__init__()
        self._kill = threading.Event()
        self._interval = sleep_interval

    def run(self):
        while True:
            print("Do Something")

            # If no kill signal is set, sleep for the interval,
            # If kill signal comes in while sleeping, immediately
            #  wake up and handle
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

Depois de executá-lo

t = KillableThread(sleep_interval=5)
t.start()
# Every 5 seconds it prints:
#: Do Something
t.kill()
#: Killing Thread

A vantagem de usar wait() vez de sleep()ing e verificar regularmente o evento é que você pode programar em intervalos mais longos de sono, o segmento é interrompido quase imediatamente (quando de outra forma seria sleep()ing) e na minha opinião, o código para manipulação saída é significativamente mais simples.

Estou muito tarde para este jogo, mas eu estive lutando com uma semelhante questão e aparecem os seguintes tanto para resolver o problema perfeitamente para mim e deixa-me fazer alguns testes estado do segmento básico e limpeza quando as saídas daemon sub-fio:

import threading
import time
import atexit

def do_work():

  i = 0
  @atexit.register
  def goodbye():
    print ("'CLEANLY' kill sub-thread with value: %s [THREAD: %s]" %
           (i, threading.currentThread().ident))

  while True:
    print i
    i += 1
    time.sleep(1)

t = threading.Thread(target=do_work)
t.daemon = True
t.start()

def after_timeout():
  print "KILL MAIN THREAD: %s" % threading.currentThread().ident
  raise SystemExit

threading.Timer(2, after_timeout).start()

Os rendimentos:

0
1
KILL MAIN THREAD: 140013208254208
'CLEANLY' kill sub-thread with value: 2 [THREAD: 140013674317568]

Há uma biblioteca construída para esta finalidade, stopit . Embora alguns dos mesmos cuidados listados aqui ainda se aplicam, pelo menos este presentes da biblioteca, uma técnica repetível regular para alcançar o objetivo declarado.

Isso parece funcionar com pywin32 no Windows 7

my_thread = threading.Thread()
my_thread.start()
my_thread._Thread__stop()

Esta é uma resposta ruim, ver os comentários

Aqui está como fazê-lo:

from threading import *

...

for thread in enumerate():
    if thread.isAlive():
        try:
            thread._Thread__stop()
        except:
            print(str(thread.getName()) + ' could not be terminated'))

Dê-lhe alguns segundos e depois a sua discussão deve ser interrompido. Verifique também o método thread._Thread__delete().

Eu recomendo um método thread.quit() por conveniência. Por exemplo, se você tem um soquete em seu segmento, eu recomendo a criação de um método quit() em sua classe Socket-handle, rescindir o socket, em seguida, executar um dentro thread._Thread__stop() do seu quit().

Inicie o sub segmento com setDaemon (True).

def bootstrap(_filename):
    mb = ModelBootstrap(filename=_filename) # Has many Daemon threads. All get stopped automatically when main thread is stopped.

t = threading.Thread(target=bootstrap,args=('models.conf',))
t.setDaemon(False)

while True:
    t.start()
    time.sleep(10) # I am just allowing the sub-thread to run for 10 sec. You can listen on an event to stop execution.
    print('Thread stopped')
    break

Embora seja bastante antigo, este pode ser um útil solução para alguns:

Um pouco módulo que estende a funcionalidade do módulo da rosca - permite que um segmento para exceções raise no contexto de outro fio. Ao elevar SystemExit, você pode finalmente matar threads python.

import threading
import ctypes     

def _async_raise(tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        # """if it returns a number greater than one, you're in trouble, 
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

class Thread(threading.Thread):
    def raise_exc(self, excobj):
        assert self.isAlive(), "thread must be started"
        for tid, tobj in threading._active.items():
            if tobj is self:
                _async_raise(tid, excobj)
                return

        # the thread was alive when we entered the loop, but was not found 
        # in the dict, hence it must have been already terminated. should we raise
        # an exception here? silently ignore?

    def terminate(self):
        # must raise the SystemExit type, instead of a SystemExit() instance
        # due to a bug in PyThreadState_SetAsyncExc
        self.raise_exc(SystemExit)

Assim, ele permite que um "fio de exceções raise no contexto de outro thread" e, desta forma, o fio terminado pode lidar com a rescisão sem verificar regularmente uma bandeira abort.

No entanto, de acordo com sua fonte original, existem alguns problemas com este código.

  • A exceção será levantada apenas quando execução python bytecode. Se o seu segmento chama um nativo / built-in função de bloqueio, o exceção será gerado apenas quando a execução retorna para o python código.
    • Há também um problema se o built-in função chama internamente PyErr_Clear (), o que efetivamente cancelar sua exceção pendente. Você pode tentar levantá-lo novamente.
  • Somente os tipos de exceção pode ser levantada com segurança. casos de exceção são susceptíveis de causar um comportamento inesperado, e são, portanto, restrito.
  • Eu pedi para expor essa função no módulo interno de rosca, mas desde ctypes tornou-se uma biblioteca padrão (a partir de 2,5), e este
    recurso não é provável que seja implementação agnóstico, pode ser mantido
    não exposto.

Pieter Hintjens - um dos fundadores da omq -project - diz, usando omq e evitando primitivas de sincronização, como fechaduras, exclusões mútuas, eventos etc., é a maneira mais sã e mais segura para escrever programas multi-threaded:

http://zguide.zeromq.org/py:all#Multithreading -com-ZeroMQ

Isso inclui dizer a um segmento de criança, que deveria cancelar o seu trabalho. Isso seria feito por equipar o fio com um omq-socket e sondagens em que soquete para uma mensagem dizendo que ele deve cancelar.

A ligação também fornece um exemplo em código de pitão de multi-threaded com omq.

Você pode executar o seu comando em um processo e depois matá-lo usando o ID do processo. Eu precisava de sincronização entre dois um fio de que não retorna por si só.

processIds = []

def executeRecord(command):
    print(command)

    process = subprocess.Popen(command, stdout=subprocess.PIPE)
    processIds.append(process.pid)
    print(processIds[0])

    #Command that doesn't return by itself
    process.stdout.read().decode("utf-8")
    return;


def recordThread(command, timeOut):

    thread = Thread(target=executeRecord, args=(command,))
    thread.start()
    thread.join(timeOut)

    os.kill(processIds.pop(), signal.SIGINT)

    return;

Se você realmente precisa a capacidade de matar uma sub-tarefa, use uma implementação alternativa. multiprocessing e gevent apoio tanto indiscriminadamente matando um "thread".

rosqueamento do Python não suporta cancelamento. Nem tente. Seu código é muito provável a um impasse, corrupto ou memória vazamento, ou ter outros efeitos difíceis de depuração "interessantes" inesperadas que acontecem raramente e não-deterministicamente.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top