Как создать параллельные дочерние процессы в многопроцессорной системе?

StackOverflow https://stackoverflow.com/questions/884650

Вопрос

У меня есть скрипт на Python, который я хочу использовать в качестве контроллера для другого скрипта на Python.У меня есть сервер с 64 процессорами, поэтому я хочу запустить до 64 дочерних процессов этого второго скрипта Python.Вызывается дочерний скрипт:

$ python create_graphs.py --name=NAME

где НАЗВАНИЕ - это что-то вроде XYZ, ABC, NYU и т.д.

В моем скрипте родительского контроллера я извлекаю переменную name из списка:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

Итак, мой вопрос в том, каков наилучший способ запустить эти процессы в качестве дочерних?Я хочу ограничить количество дочерних процессов до 64 одновременно, поэтому мне нужно отслеживать статус (завершен ли дочерний процесс или нет), чтобы я мог эффективно поддерживать работу всего поколения.

Я рассматривал возможность использования пакета subprocess, но отклонил его, потому что он одновременно создает только одного дочернего элемента.Я, наконец, нашел многопроцессорный пакет, но я признаю, что был перегружен целыми потоками по сравнениюдокументация по подпроцессам.

Прямо сейчас мой скрипт использует subprocess.call создавать только одного дочернего элемента одновременно и выглядит примерно так:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

Я действительно хочу, чтобы одновременно в нем появлялось до 64 детей.В других вопросах stackoverflow я видел, как люди использовали Queue, но, похоже, это снижает производительность?

Это было полезно?

Решение

То, что вы ищете, - это пул процессов класс в многопроцессорной обработке.

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

А вот пример расчета, чтобы было легче понять.Следующее разделит 10000 задач на N процессов, где N - количество процессоров.Обратите внимание, что я передаю None как количество процессов.Это приведет к тому, что класс пула будет использовать cpu_count для количества процессов (ссылка)

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results

Другие советы

Вот решение, которое я придумал, основываясь на комментариях Нади и Джима.Я не уверен, что это лучший способ, но он работает.Вызываемый исходный дочерний скрипт должен быть сценарием оболочки, потому что мне нужно использовать некоторые сторонние приложения, включая Matlab.Поэтому мне пришлось взять это из Python и закодировать в bash.

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

Кажется ли это разумным решением?Я попытался использовать формат цикла while от Jim, но мой скрипт просто ничего не вернул.Я не уверен, почему это могло бы быть.Вот результат, когда я запускаю скрипт с циклом 'while' Джима, заменяющим цикл 'for':

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

Когда я запускаю его с циклом 'for', я получаю нечто более значимое:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

Итак, это работает, и я счастлив.Однако я все еще не понимаю, почему я не могу использовать цикл в стиле 'while' Джима вместо цикла 'for', который я использую.Спасибо за всю помощь - я впечатлен широтой знаний @ stackoverflow.

Я бы определенно использовал многопроцессорная обработка вместо того, чтобы внедрять мое собственное решение с помощью подпроцесса.

Я не думаю, что вам нужна очередь, если вы не собираетесь получать данные из приложений (которые, если вам действительно нужны данные, я думаю, что в любом случае может быть проще добавить их в базу данных)

но примерьте это на размер:

поместите содержимое вашего скрипта create_graphs.py все в функцию под названием "create_graphs"

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

Я знаю, что это приведет к тому, что потоков будет на 1 меньше, чем процессоров, что, вероятно, хорошо, это оставляет процессор для управления потоками, дисковым вводом-выводом и другими вещами, происходящими на компьютере.Если вы решите, что хотите использовать последнее ядро, просто добавьте к нему одно

Редактировать:Я думаю, что, возможно, я неправильно истолковал назначение my_list .Вам не нужно my_list чтобы вообще отслеживать потоки (поскольку на все они ссылаются элементы в threads список).Но это прекрасный способ подачи входных данных процессам - или даже лучше:используйте функцию генератора ;)

Цель my_list и threads

my_list содержит данные, которые вам нужно обработать в вашей функции
threads это просто список запущенных в данный момент потоков

цикл while выполняет две вещи: запускает новые потоки для обработки данных и проверяет, завершены ли какие-либо потоки.

Таким образом, до тех пор, пока у вас есть либо (а) больше данных для обработки, либо (б) потоки, которые еще не завершены....вы хотите, чтобы программа продолжала работать. Как только оба списка станут пустыми, они будут оцениваться как False и цикл while завершится

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top