質問

Pythonのマルチプロセッシングについて質問です。データセットを取得し、チャンクに分割し、それらのチャンクを同時実行プロセスに渡そうとしています。単純な計算を使用して大規模なデータテーブルを変換する必要があります (例:サーミスタの場合は電気抵抗 -> 温度)。

以下にリストされているコードはほぼ希望どおりに動作しますが、新しいプロセスを生成していないようです (生成している場合でも、一度に 1 つだけ)。私は Python を初めて使用するので、この問題にはおそらく非常に簡単な解決策があるでしょう。

前もって感謝します!

from multiprocessing import Process

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = map(self.process, self.data)
        super(Worker, self).__init__()

if __name__ == '__main__':
    start = datetime.datetime.now()
    dataset = range(10000) # null dataset
    processes = 3

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))

        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        tmp = dataset[i * chunk : (i + 1) * chunk + remainder]
        exec('worker'+str(i)+' = Worker(tmp)')
        exec('worker'+str(i)+'.start()')

    for i in range(processes):
        exec('worker'+str(i)+'.join()')
        # just a placeholder to make sure the initial values of the set are as expected
        exec('print worker'+str(i)+'.result[0]')
役に立ちましたか?

解決

ちょうどget_nowaitを(使用し、各プロセスにチャンクの番号を送信)し、最終的Queue.Emptyの例外を処理する必要はありません。すべてのプロセスは、CPU時間の異なる量を取得し、これが忙しいそれらすべてを維持する必要があります。

import multiprocessing, Queue

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output):
        self.input = input
        self.output = output
        super(Worker, self).__init__()

    def run(self):
        try:
            while True:
                self.output.put(self.process(self.input.get_nowait()))
        except Queue.Empty:
            pass


if name == 'main':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)
    for i in range(processes):
        Worker(input, output).start()

    for i in range(len(dataset)):
        print output.get()

他のヒント

をオーバーライドしていません run 方法。プロセス (またはスレッド) でコードを実行するには 2 つの方法があります。

  1. ターゲットを指定してプロセスを作成する
  2. プロセスをサブクラス化し、 run 方法。

オーバーライド __init__ これは、プロセスがすべてドレスアップされ、行き場がないことを意味します。これは、実行する必要があることを実行するために必要な属性を与えるために使用する必要がありますが、実行するタスクを指定するべきではありません。

コードでは、すべての面倒な作業は次の行で行われます。

exec('worker'+str(i)+' = Worker(tmp)')

ここでは何も行われません:

exec('worker'+str(i)+'.start()')

したがって、結果を確認すると、 exec('print worker'+str(i)+'.result[0]') 意味のあるものを提供する必要がありますが、それは実行したいコードだけのためです もっている 実行されましたが、プロセスの開始時ではなく、プロセスの構築時に実行されました。

これを試して:

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = []
        super(Worker, self).__init__()

    def run(self):
        self.result = map(self.process, self.data)

編集:

わかった...だから私はここで自分の直感に基づいて飛んでいただけですが、それらはすべて間違っていました。私たち二人ともプロセスについて理解していなかった点は、変数を直接共有できないことです。開始するために新しいプロセスに渡すものはすべて、読み取られ、コピーされ、永久に失われます。データを共有する 2 つの標準的な方法のいずれかを使用する場合を除きます。 キューとパイプ. 。コードを動作させるために少し試してみましたが、今のところうまくいきません。それがあなたを正しい軌道に乗せると思います。

[OK]を、それはリストのように見えるので、スレッドセーフではありませんし、私は(はるかに遅いように見えるが)キューを使用して移動してきました。このコードは、基本的に私が何をしようとしていたものを実現します:

import math, multiprocessing

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output, chunksize):
        self.input = input
        self.output = output
        self.chunksize = chunksize
        super(Worker, self).__init__()

    def run(self):
        for x in range(self.chunksize):
            self.output.put(self.process(self.input.get()))


if __name__ == '__main__':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))
        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        Worker(input, output, chunk + remainder).start()

    for i in range(len(dataset)):
        print output.get()
ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top