Python の動的プロセス
-
09-09-2019 - |
質問
Pythonのマルチプロセッシングについて質問です。データセットを取得し、チャンクに分割し、それらのチャンクを同時実行プロセスに渡そうとしています。単純な計算を使用して大規模なデータテーブルを変換する必要があります (例:サーミスタの場合は電気抵抗 -> 温度)。
以下にリストされているコードはほぼ希望どおりに動作しますが、新しいプロセスを生成していないようです (生成している場合でも、一度に 1 つだけ)。私は Python を初めて使用するので、この問題にはおそらく非常に簡単な解決策があるでしょう。
前もって感謝します!
from multiprocessing import Process class Worker(Process): # example data transform def process(self, x): return (x * 2) / 3 def __init__(self, list): self.data = list self.result = map(self.process, self.data) super(Worker, self).__init__() if __name__ == '__main__': start = datetime.datetime.now() dataset = range(10000) # null dataset processes = 3 for i in range(processes): chunk = int(math.floor(len(dataset) / float(processes))) if i + 1 == processes: remainder = len(dataset) % processes else: remainder = 0 tmp = dataset[i * chunk : (i + 1) * chunk + remainder] exec('worker'+str(i)+' = Worker(tmp)') exec('worker'+str(i)+'.start()') for i in range(processes): exec('worker'+str(i)+'.join()') # just a placeholder to make sure the initial values of the set are as expected exec('print worker'+str(i)+'.result[0]')
解決
ちょうどget_nowaitを(使用し、各プロセスにチャンクの番号を送信)し、最終的Queue.Emptyの例外を処理する必要はありません。すべてのプロセスは、CPU時間の異なる量を取得し、これが忙しいそれらすべてを維持する必要があります。
import multiprocessing, Queue
class Worker(multiprocessing.Process):
def process(self, x):
for i in range(15):
x += (float(i) / 2.6)
return x
def __init__(self, input, output):
self.input = input
self.output = output
super(Worker, self).__init__()
def run(self):
try:
while True:
self.output.put(self.process(self.input.get_nowait()))
except Queue.Empty:
pass
if name == 'main':
dataset = range(10)
processes = multiprocessing.cpu_count()
input = multiprocessing.Queue()
output = multiprocessing.Queue()
for obj in dataset:
input.put(obj)
for i in range(processes):
Worker(input, output).start()
for i in range(len(dataset)):
print output.get()
他のヒント
をオーバーライドしていません run
方法。プロセス (またはスレッド) でコードを実行するには 2 つの方法があります。
- ターゲットを指定してプロセスを作成する
- プロセスをサブクラス化し、
run
方法。
オーバーライド __init__
これは、プロセスがすべてドレスアップされ、行き場がないことを意味します。これは、実行する必要があることを実行するために必要な属性を与えるために使用する必要がありますが、実行するタスクを指定するべきではありません。
コードでは、すべての面倒な作業は次の行で行われます。
exec('worker'+str(i)+' = Worker(tmp)')
ここでは何も行われません:
exec('worker'+str(i)+'.start()')
したがって、結果を確認すると、 exec('print worker'+str(i)+'.result[0]')
意味のあるものを提供する必要がありますが、それは実行したいコードだけのためです もっている 実行されましたが、プロセスの開始時ではなく、プロセスの構築時に実行されました。
これを試して:
class Worker(Process):
# example data transform
def process(self, x): return (x * 2) / 3
def __init__(self, list):
self.data = list
self.result = []
super(Worker, self).__init__()
def run(self):
self.result = map(self.process, self.data)
編集:
わかった...だから私はここで自分の直感に基づいて飛んでいただけですが、それらはすべて間違っていました。私たち二人ともプロセスについて理解していなかった点は、変数を直接共有できないことです。開始するために新しいプロセスに渡すものはすべて、読み取られ、コピーされ、永久に失われます。データを共有する 2 つの標準的な方法のいずれかを使用する場合を除きます。 キューとパイプ. 。コードを動作させるために少し試してみましたが、今のところうまくいきません。それがあなたを正しい軌道に乗せると思います。
[OK]を、それはリストのように見えるので、スレッドセーフではありませんし、私は(はるかに遅いように見えるが)キューを使用して移動してきました。このコードは、基本的に私が何をしようとしていたものを実現します:
import math, multiprocessing
class Worker(multiprocessing.Process):
def process(self, x):
for i in range(15):
x += (float(i) / 2.6)
return x
def __init__(self, input, output, chunksize):
self.input = input
self.output = output
self.chunksize = chunksize
super(Worker, self).__init__()
def run(self):
for x in range(self.chunksize):
self.output.put(self.process(self.input.get()))
if __name__ == '__main__':
dataset = range(10)
processes = multiprocessing.cpu_count()
input = multiprocessing.Queue()
output = multiprocessing.Queue()
for obj in dataset:
input.put(obj)
for i in range(processes):
chunk = int(math.floor(len(dataset) / float(processes)))
if i + 1 == processes:
remainder = len(dataset) % processes
else: remainder = 0
Worker(input, output, chunk + remainder).start()
for i in range(len(dataset)):
print output.get()