Cómo generar procesos secundarios paralelos en un sistema multi-procesador?
-
23-08-2019 - |
Pregunta
Tengo un script en Python que quiero usar como un controlador a otro script en Python. Tengo un servidor con 64 procesadores, por lo que quiero para desovar hasta 64 procesos hijos de este segundo script Python. El guión niño se llama:
$ python create_graphs.py --name=NAME
donde nombre es algo así como XYZ, ABC, etc. Universidad de Nueva York.
En mi script del controlador padres que recuperar la variable nombre de una lista:
my_list = [ 'XYZ', 'ABC', 'NYU' ]
Así que mi pregunta es, ¿cuál es la mejor manera para desovar fuera de estos procesos como los niños? Quiero limitar el número de niños a 64 a la vez, por lo que necesita para realizar un seguimiento del estado (si el proceso hijo ha terminado o no), así que puede mantener de manera eficiente toda la generación en ejecución.
La miré a utilizar el paquete de sub-proceso, pero la rechacé porque sólo genera un niño a la vez. Finalmente encontré el paquete multiprocesador, pero admito a ser abrumado por toda la documentación hilos vs. subprocesos.
En este momento, mi script usa subprocess.call
sólo para desovar un niño a la vez y se ve así:
#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process
my_list = [ 'XYZ', 'ABC', 'NYU' ]
if __name__ == '__main__':
processors = multiprocessing.cpu_count()
for i in range(len(my_list)):
if( i < processors ):
cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
child = subprocess.call( cmd, shell=False )
realmente lo quiero para desovar hasta 64 niños a la vez. En otras preguntas StackOverflow vi gente usando cola, pero parece que crea un impacto en el rendimiento?
Solución
Lo que se busca es el proceso rel="noreferrer"> clase piscina
Y aquí es un ejemplo de cálculo para que sea más fácil de entender. A continuación se divide 10000 tareas en procesos N donde N es el número de CPU. Tenga en cuenta que estoy pasando Ninguno como el número de procesos. Esto hará que la clase piscina de usar cpu_count para el número de procesos ( referencia) import multiprocessing
import subprocess
def work(cmd):
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
print pool.map(work, ['ls'] * count)
import multiprocessing
import subprocess
def calculate(value):
return value * 10
if __name__ == '__main__':
pool = multiprocessing.Pool(None)
tasks = range(10000)
results = []
r = pool.map_async(calculate, tasks, callback=results.append)
r.wait() # Wait on the results
print results
Otros consejos
Aquí está la solución que se me ocurrió, en base a Nadia y los comentarios de Jim. No estoy seguro de si se trata de la mejor manera, pero funciona. El guión original de ser llamado hijo tiene que ser un shell script porque tengo que utilizar algunas aplicaciones 3 ª parte incluyendo Matlab. Así que tuve que sacarlo de Python y codificarlo en bash.
import sys
import os
import multiprocessing
import subprocess
def work(staname):
print 'Processing station:',staname
print 'Parent process:', os.getppid()
print 'Process id:', os.getpid()
cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
my_list = [ 'XYZ', 'ABC', 'NYU' ]
my_list.sort()
print my_list
# Get the number of processors available
num_processes = multiprocessing.cpu_count()
threads = []
len_stas = len(my_list)
print "+++ Number of stations to process: %s" % (len_stas)
# run until all the threads are done, and there is no data left
for list_item in my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if( len(threads) < num_processes ):
p = multiprocessing.Process(target=work,args=[list_item])
p.start()
print p, p.is_alive()
threads.append(p)
else:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
¿Le parece esto como una solución razonable? He intentado utilizar el formato bucle while de Jim, pero mi guión acabo de volver nada. No estoy seguro de por qué eso sería. Aquí está la salida al ejecutar el script con Jim 'mientras que' bucle de la sustitución de la 'para' bucle:
hostname{me}2% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%
Cuando corro con el 'para' bucle, consigo algo más significativo:
hostname{me}6% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%
Así funciona esto, y estoy feliz. Sin embargo, todavía no entiendo por qué no puedo utilizar 'mientras que' bucle al estilo de Jim en lugar de la 'para' bucle que estoy utilizando. Gracias por toda la ayuda - Estoy impresionado con la amplitud de conocimientos @ stackoverflow
.Sin duda usar multiprocesamiento en lugar de rodar mi propia solución utilizando subproceso.
No creo que necesita cola a menos que pretenda obtener datos de las aplicaciones (que si usted desea que los datos, creo que puede ser más fácil para añadirlo a una base de datos de todos modos)
pero tratar esto en el tamaño:
poner el contenido de su guión create_graphs.py todo en una función llamada "create_graphs"
import threading
from create_graphs import create_graphs
num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]
threads = []
# run until all the threads are done, and there is no data left
while threads or my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if (len(threads) < num_processes) and my_list:
t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
t.setDaemon(True)
t.start()
threads.append(t)
# in the case that we have the maximum number of threads check if any of them
# are done. (also do this when we run out of data, until all the threads are done)
else:
for thread in threads:
if not thread.isAlive():
threads.remove(thread)
Sé que esto se traducirá en 1 hilos menos que los procesadores, lo cual es probablemente bueno, deja un procesador para manejar los hilos, disco I / O, y otras cosas que suceden en el equipo. Si decide que desea utilizar el último núcleo sólo tiene que añadir uno a ella
editar : Creo que pude haber malinterpretado el propósito de my_list. No es necesario my_list
para realizar un seguimiento de los hilos en absoluto (como todos están referenciados por los elementos de la lista threads
). Pero esta es una buena manera de alimentar a la entrada de procesos - o aún mejor: utilizar una función de generador;)
El propósito de my_list
y threads
my_list
contiene los datos que necesita para procesar en su función
threads
es sólo una lista de los temas que se están ejecutando
el bucle while hace dos cosas, iniciar nuevos temas para procesar los datos y comprobar si todos los hilos se hacen correr.
Así que, mientras usted tiene cualquiera de (a) más datos para procesar, o (b) temas que no están terminado de ejecutarse .... que desea programar para seguir funcionando. Una vez que ambas listas están vacías evaluarán a False
y el bucle while saldrán