我有希望作为控制器到另一个Python脚本使用Python脚本。我有64个处理器的服务器,所以要产卵到这个第二Python脚本的64个进程。孩子脚本名为:

$ python create_graphs.py --name=NAME

其中NAME是一样的东西XYZ,ABC,NYU等

在我的父控制器脚本余从列表中检索的名称变量:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

所以我的问题是,什么是产卵关闭这些进程是孩子最好的方法是什么?我想孩子的数量限制为64的时间,因此需要跟踪的状态(如果子进程已经完成与否)这样我就可以有效地保持整个发电持续。

我看着使用子包,但拒绝它,因为它仅在一个时间派生一个孩子。我终于找到了多处理器包,但我承认由全螺纹与子流程文档被淹没。

现在,我的脚本使用subprocess.call仅在一次产卵一个孩子,看起来像这样:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

我真的希望它在一个时间产卵多达64名儿童。在其他计算器的问题,我看到了使用队列的人,但它似乎是创建一个性能命中?

有帮助吗?

解决方案

你所寻找的是进程池类在多处理。

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

这是一个计算实例,使之更容易理解。下面将分为N个进程10000级的任务,其中N为CPU计数。请注意,我路过无作为的进程数。这将导致池类使用CPU_COUNT的进程数(参考

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results

其他提示

下面是解决方案,我想出了基于纳迪亚和吉姆的意见。我不知道这是否是最好的方式,但它的工作原理。被称为原始的孩子剧本需要一个shell脚本,因为我需要使用一些第三方应用程序,包括Matlab的。因此,我不得不将其取出的Python和它在bash代码

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

这看起来像一个合理的解决方案?我试图用吉姆的while循环的格式,但我的脚本只是返回任何内容。我不知道为什么会。下面是输出时我与Jim的“而”运行脚本循环更换“for”循环:

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

当我运行它的“for”循环,我得到的东西更有意义:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

所以这个作品,我很高兴。不过,我仍然不知道为什么我不能用的,而不是“for”循环我用的吉姆的“而”风格循环。感谢所有帮助 - 我很深刻的印象,知识@计算器的广度

我肯定会使用多处理而不是使用子滚动我自己的解决方案。

我不认为你需要排队,除非你打算让数据从应用程序(如果确实需要的数据,我认为这可能是更容易将其添加到数据库反正)的

,但尝试这对尺寸:

把你create_graphs.py脚本的内容全部放入“create_graphs”功能

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

我知道,这将导致1个线程少于处理器,这也许是好事,它留下的处理器来管理线程,磁盘I / O,以及其他的事情发生在计算机上。如果决定要使用的最后一个核心只需添加一个到它

修改:我想可能误解my_list的目的。你不需要my_list跟踪线程在所有(因为他们都是由threads列表中的项目引用)。但是,这是供给处理输入的微细方式 - 或甚至更好:使用发电机功能;)

my_listthreads的目的

my_list认为,你需要在你的函数结果来处理数据 threads仅仅是一个当前正在运行的线程的列表

while循环做两件事情,启动新线程来处理数据,并检查是否有任何线程完成运行。

所以,只要你有是(a)更多的数据进行处理,或(b)未完成正在运行的线程....要编程继续运行。的一旦两个列表是空的,他们将评估为False和while循环将退出

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top