マルチプロセッサ システム上で並列子プロセスを生成するにはどうすればよいですか?
-
23-08-2019 - |
質問
別の Python スクリプトへのコントローラーとして使用したい Python スクリプトがあります。64 個のプロセッサを備えたサーバーがあるため、この 2 番目の Python スクリプトの最大 64 個の子プロセスを生成したいと考えています。子スクリプトの名前は次のとおりです。
$ python create_graphs.py --name=NAME
ここで、NAME は XYZ、ABC、NYU などです。
親コントローラー スクリプトでは、リストから name 変数を取得します。
my_list = [ 'XYZ', 'ABC', 'NYU' ]
そこで私の質問は、これらのプロセスを子として生成する最善の方法は何でしょうか?子の数を一度に 64 に制限したいので、世代全体を効率的に実行し続けることができるようにステータス (子プロセスが終了したかどうか) を追跡する必要があります。
subprocess パッケージの使用を検討しましたが、一度に 1 つの子しか生成されないため、拒否しました。ようやくマルチプロセッサ パッケージを見つけましたが、スレッド全体とスレッド全体に圧倒されていることは認めます。ドキュメントをサブプロセスします。
現時点では、私のスクリプトでは subprocess.call
一度に 1 つの子だけを生成すると、次のようになります。
#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process
my_list = [ 'XYZ', 'ABC', 'NYU' ]
if __name__ == '__main__':
processors = multiprocessing.cpu_count()
for i in range(len(my_list)):
if( i < processors ):
cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
child = subprocess.call( cmd, shell=False )
一度に 64 人の子供を生成したいと考えています。他の stackoverflow の質問で、Queue を使用している人を見ましたが、それによってパフォーマンスが低下するようですか?
解決
あなたが探していることはプロセスプールのクラスですマルチプロセッシングインチ
import multiprocessing
import subprocess
def work(cmd):
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
print pool.map(work, ['ls'] * count)
そして、ここでは分かりやすくするために、計算例があります。以下は、Nは、CPU数であるNプロセス上の10000個のタスクを分割します。私はプロセスの数としてNoneを渡していないんだということに注意してください。これは、プールのクラスは、プロセス数のためCPU_COUNTを使用するようになります(参照する)
import multiprocessing
import subprocess
def calculate(value):
return value * 10
if __name__ == '__main__':
pool = multiprocessing.Pool(None)
tasks = range(10000)
results = []
r = pool.map_async(calculate, tasks, callback=results.append)
r.wait() # Wait on the results
print results
他のヒント
ここで私はナディアとジムのコメントに基づいて、思いついたソリューションです。私はそれが最善の方法であるかどうかわからないのですが、それは動作します。私はMatlabのを含め、いくつかのサードパーティ製のアプリを使用する必要があるためと呼ばれているオリジナルの子スクリプトは、シェルスクリプトである必要があります。だから私は、Pythonの外にそれを取るとbashでそれをコーディングする必要がありました。
import sys
import os
import multiprocessing
import subprocess
def work(staname):
print 'Processing station:',staname
print 'Parent process:', os.getppid()
print 'Process id:', os.getpid()
cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
my_list = [ 'XYZ', 'ABC', 'NYU' ]
my_list.sort()
print my_list
# Get the number of processors available
num_processes = multiprocessing.cpu_count()
threads = []
len_stas = len(my_list)
print "+++ Number of stations to process: %s" % (len_stas)
# run until all the threads are done, and there is no data left
for list_item in my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if( len(threads) < num_processes ):
p = multiprocessing.Process(target=work,args=[list_item])
p.start()
print p, p.is_alive()
threads.append(p)
else:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
これは合理的な解決策のように見えるのか?私はジムのwhileループの形式を使用しようとしましたが、私のスクリプトはただ何も返されません。私はそれは次のようになり、なぜわかりません。私はジムの「しばらく」でスクリプトを実行すると、ここで出力されます置き換えるループループ「の」
hostname{me}2% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%
するとき、私は「for」ループ、私はより意味のある何かを得ると、それを実行します:
hostname{me}6% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%
これは動作しますが、私は幸せです。スタイルのループの代わりに、私が使用しています「の」ループ「しながら、」私はジムのを使用できない理由しかし、私はまだ得ることはありません。すべての助けをありがとう - 私は、知識の@のstackoverflowの広さに感銘を受けています。
。アプリケーションからデータを取得するつもりがない限り、キューは必要ないと思います(データが必要な場合は、とにかくデータベースに追加する方が簡単だと思います)
サイズについてはこれを試着してください:
create_graphs.py スクリプトの内容をすべて「create_graphs」という関数に入れます。
import threading
from create_graphs import create_graphs
num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]
threads = []
# run until all the threads are done, and there is no data left
while threads or my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if (len(threads) < num_processes) and my_list:
t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
t.setDaemon(True)
t.start()
threads.append(t)
# in the case that we have the maximum number of threads check if any of them
# are done. (also do this when we run out of data, until all the threads are done)
else:
for thread in threads:
if not thread.isAlive():
threads.remove(thread)
これにより、プロセッサよりもスレッドが 1 つ少なくなることがわかっています。これはおそらく良いことであり、プロセッサはスレッド、ディスク I/O、およびコンピュータ上で発生するその他の処理を管理できるようになります。最後のコアを使用する場合は、それに 1 つ追加するだけです
編集:my_list の目的を誤解している可能性があります。いらないよ my_list
スレッドをすべて追跡するため (スレッドはすべて、 threads
リスト)。しかし、これはプロセスに入力を供給する優れた方法です。あるいは、さらに良い方法です。ジェネレーター関数を使用してください;)
の目的 my_list
そして threads
my_list
関数で処理する必要があるデータを保持します
threads
現在実行中のスレッドの単なるリストです
while ループは 2 つのことを行います。データを処理するために新しいスレッドを開始し、実行中のスレッドがあるかどうかを確認します。
したがって、(a) 処理するデータがまだあるか、(b) 実行が完了していないスレッドがある限り...実行を継続するようにプログラムしたい。 両方のリストが空になると、次のように評価されます。 False
while ループは終了します