Pergunta

Eu tenho um script Python que eu quero usar como um controlador para outro script Python. Eu tenho um servidor com 64 processadores, por isso quero gerar até 64 processos filhos desta segunda script Python. O script criança chama-se:

$ python create_graphs.py --name=NAME

onde NOME é algo como XYZ, ABC, NYU etc.

No meu script controlador pai eu recuperar a variável nome de uma lista:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

Então, minha pergunta é, qual é a melhor maneira de gerar off esses processos como crianças? Eu quero limitar o número de crianças a 64 de cada vez, por isso, necessidade de acompanhar o status (se o processo filho terminar ou não) para que eu possa manter de forma eficiente toda a geração em execução.

Eu olhei para usando o pacote subprocesso, mas rejeitou-a, porque ele só gera uma criança de cada vez. Eu finalmente encontrei o pacote multiprocessador, mas eu admito a ser oprimido por todo o tópicos vs. documentação subprocessos.

Agora, meus usos de script subprocess.call para apenas um filho desova de cada vez e fica assim:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

Eu realmente quero que ele gere até 64 crianças de cada vez. Em outras perguntas stackoverflow eu vi pessoas usando fila, mas parece que cria um impacto no desempenho?

Foi útil?

Solução

O que você está procurando é a processo classe piscina em multiprocessamento.

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

E aqui está um exemplo de cálculo para torná-lo mais fácil de entender. A seguir irá dividir 10000 tarefas em processos de N, onde N é a contagem de cpu. Note que eu estou passando Nenhum como o número de processos. Isso fará com que a classe Pool para uso cpu_count para o número de processos ( referência )

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results

Outras dicas

Aqui está a solução que eu vim acima, com base em Nadia e comentários de Jim. Não tenho a certeza se é a melhor maneira, mas funciona. O script criança original que está sendo chamado de necessidades para ser um shell script, porque eu preciso usar alguns 3rd partido apps incluindo Matlab. Então eu tive que tirá-lo de Python e código-lo em bash.

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

Isto parece como uma solução razoável? Eu tentei usar o formato de loop while de Jim, mas meu script apenas retornou nada. Eu não sou certo porque isso seria. Aqui é a saída quando eu executar o script com Jim do loop 'while' substituindo o 'para' loop:

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

Quando eu executá-lo com o 'para' loop, eu recebo algo mais significativo:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

Então, isso funciona, e estou feliz. No entanto, eu ainda não entendo por que eu não posso usar Jim do loop 'while' estilo em vez do laço 'for' Eu estou usando. Obrigado por toda a ajuda -. Estou impressionado com a amplitude de conhecimento @ stackoverflow

Eu definitivamente utilizar multiprocessamento em vez de rolar minha própria solução usando subprocesso.

Eu não acho que você precisa de fila a menos que você pretende obter dados para fora das aplicações (Que se você deseja que os dados, eu acho que pode ser mais fácil para adicioná-lo a um banco de dados de qualquer maneira)

mas tentar este sobre para o tamanho:

colocar o conteúdo de seu script create_graphs.py tudo em uma função chamada "create_graphs"

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

Eu sei que isso irá resultar em 1 menos fios do que os processadores, o que é provavelmente bom, deixa um processador de gerir os fios, disco I / O, e outras coisas acontecendo no computador. Se você decidir que quer usar o último núcleo basta adicionar um a ele

Editar : Eu acho que pode ter interpretado mal o propósito de my_list. Você não precisa my_list para acompanhar os fios em tudo (como eles estão todos referenciados pelos itens na lista threads). Mas esta é uma boa maneira de alimentar a entrada de processos - ou ainda melhor: usar uma função de gerador;)

O propósito da my_list e threads

my_list contém os dados que você precisa processo em sua função
threads é apenas uma lista dos tópicos atualmente em execução

o loop while faz duas coisas, começar novos tópicos para processar os dados e verificar se algum tópicos são feitas em execução.

Então, enquanto você tem (a) mais dados para processar, ou (b) tópicos que não estão executando acabado .... você quer programa para continuar funcionando. Uma vez que ambas as listas estão vazias que irá avaliar a False eo while sairá

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top