質問

どのような利用 マルチプロセ に取り組む embarrassingly並列問題?

Embarassingly並列問題はあるかないかの基本的な部品:

  1. 読む 入力データファイルから、データベース、tcp接続等)。
  2. 計算の入力データを、それぞれの計算 独立したその他の計算.
  3. 結果の算定(ファイル、データベース、tcp接続等)。

できる並列プログラムの二つの外形寸法:

  • 第2部で複数のコアで、それぞれの計算は独立した処理の順番は関係ありません。
  • 各部が独立して実行。第1部ではデータ入力キュー、第2部ではプルデータの入力キューに入れて結果を出力キューに、第3部でプル結果の出力キューに書き込もうとします。

このように見える最も基本的なパターンを並行プログラミングでは、いまだに紛失しようとする問題を解決し、 を書いてみよう標準的な事例として用マルチプロセ.

ここでは、例題:与えられ CSVファイル と列の整数を入力する事により、金額を計算.個別の問題などに、すべて並列実行:

  1. 処理の入力ファイルを生データ(リスト/iterablesの整数)
  2. 計算の金額は、データの並列
  3. 出力する金額

以下は、シングルプロセス行きのPythonプログラムを解決するこれらの課題

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# basicsums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file.
"""

import csv
import optparse
import sys

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    return cli_parser


def parse_input_csv(csvfile):
    """Parses the input CSV and yields tuples with the index of the row
    as the first element, and the integers of the row as the second
    element.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.reader` instance

    """
    for i, row in enumerate(csvfile):
        row = [int(entry) for entry in row]
        yield i, row


def sum_rows(rows):
    """Yields a tuple with the index of each input list of integers
    as the first element, and the sum of the list of integers as the
    second element.

    The index is zero-index based.

    :Parameters:
    - `rows`: an iterable of tuples, with the index of the original row
      as the first element, and a list of integers as the second element

    """
    for i, row in rows:
        yield i, sum(row)


def write_results(csvfile, results):
    """Writes a series of results to an outfile, where the first column
    is the index of the original row of data, and the second column is
    the result of the calculation.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.writer` instance to which to write results
    - `results`: an iterable of tuples, with the index (zero-based) of
      the original row as the first element, and the calculated result
      from that row as the second element

    """
    for result_row in results:
        csvfile.writerow(result_row)


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)
    # gets an iterable of rows that's not yet evaluated
    input_rows = parse_input_csv(in_csvfile)
    # sends the rows iterable to sum_rows() for results iterable, but
    # still not evaluated
    result_rows = sum_rows(input_rows)
    # finally evaluation takes place as a chain in write_results()
    write_results(out_csvfile, result_rows)
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

きっかけにプログラム書き換えのなかで活用することができ多重処理の並列化には部品には、上記の以下はスケルトンのこの新しい並列化プログラムのニーズを具体化への部品のコメント:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)

    # Parse the input file and add the parsed data to a queue for
    # processing, possibly chunking to decrease communication between
    # processes.

    # Process the parsed data as soon as any (chunks) appear on the
    # queue, using as many processes as allotted by the user
    # (opts.numprocs); place results on a queue for output.
    #
    # Terminate processes when the parser stops putting data in the
    # input queue.

    # Write the results to disk as soon as they appear on the output
    # queue.

    # Ensure all child processes have terminated.

    # Clean up files.
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

これらのコードなど 別のコードを生成できる例をCSVファイル テスト目的で たgithub.

私は皆さまからのご意見にいらしてどう並行処理で達のうアプローチはこの問題です。


Faq(よくある質問あったかを考えた時に、この問題です。 ボーナスポイントへの対応当:

  • い子プロセスのための読書のデータを組み込むことで、キューでの主要プロセスなブロックせずにまで全ての入力です。
  • 同様に、い子プロセスの結果から、処理のキューでの主要プロセスいこと待たされることなくすべてのが多かった。
  • どちらを使うべきで プロセスプール の和が実現されるのでしょう。
  • 仮にはまだサイフォンの入出力キューにしてデータを入力していることが可能になるまで待すべての入力バッファ内のバイトを構文解析され、すべての結果を算出した(例えば、私のすべての入出力に適合できるシステムメモリ).していくべきな変化のアルゴリズムを任意の方法(例えば、走らないで他のプロセスと同時にI/O)?
役に立ちましたか?

解決

私のソリューションは、余分な鐘や笛の出力の順序が入力の順序と同じを持っていることを確認しています。私は、各プロセスがキューをチェック終了することを知っているので、停止メッセージを送信し、プロセス間でデータを送信するためにmultiprocessing.queueのを使用します。私は、ソース内のコメントは、それは私に知らせていないではなく、場合何が起こっているかクリアにするべきだと思います。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input process sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across processes so open/close
        # and use it all in the same process or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])

他のヒント

パーティーに遅刻...

JOBLIB には、ループの並列を行う助けにマルチプロセッシングの上に層を有します。それはあなたの仕事の怠惰派遣し、その非常に単純な構文に加えて、よりよいエラー報告のような施設を提供しています。

免責事項として、私はJOBLIBの原作者です。

私は少し遅れてパーティーのためだと実感が、私は最近、 GNU並列を発見しましたに、それはそれで、この一般的なタスクを達成することがいかに簡単であるか示したいと思います。

cat input.csv | parallel ./sum.py --pipe > sums

このような何かがsum.pyのために行います。

#!/usr/bin/python

from sys import argv

if __name__ == '__main__':
    row = argv[-1]
    values = (int(value) for value in row.split(','))
    print row, ':', sum(values)

パラレルは、(もちろん、平行に)sum.pyでライン毎にinput.csvに、出力結果をsumsを実行します。明らかに、より良いmultiprocessingの手間より

まったようです。

p1.py

import csv
import pickle
import sys

with open( "someFile", "rb" ) as source:
    rdr = csv.reader( source )
    for line in eumerate( rdr ):
        pickle.dump( line, sys.stdout )

p2.py

import pickle
import sys

while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    pickle.dump( i, sum(row) )

p3.py

import pickle
import sys
while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    print i, row

このマルチプロセッシングの最終構造です。

python p1.py | python p2.py | python p3.py

あり、シェルは、ニットこれらのOSです。そうで簡単ないので非常に泳いでいる。

ありが少しオーバーヘッドを使用漬け(又はcPickle).の簡素化しようとするのはの価値は十分あります。

ご希望の場合はファイル名を引数 p1.py, そう簡単に変化します。

この機能は次のように非常に便利です。

def get_stdin():
    while True:
        try:
            yield pickle.load( sys.stdin )
        except EOFError:
            return

ることができること:

for item in get_stdin():
     process item

これは非常に簡単なものではありません 簡単に できる複数のコピー P2.py 走っています。

い問題がされているいくつかの新しいファンです。のP1.py からファンへの複数のP2.py's.のP2.py'sから合併その結果を単一のP3.py.

いにあり、立派な門構えの教室はアプローチをされているいくつかの新しいは"Push"建築は、あまり効果がありませんでした。

理論的には、複数のP2.py's"の引きらキューに最適な資源配分これはしばしば理想ですが、でも相当のプログラミングはプログラミングの本当に必要ですか?またはラウンドロビンを処理すると良いですか。

実質的にするP1.py う簡単に"ラウンドロビン"というテーマで複数のP2.py's場合があります。思いまP1.py 設定されるのもうれ n のコピー P2.py 名前の由る事に成功しました。のP2.py's"が各々からの読み込みに適したパイプです。

どの場合P2.py がすべての"最悪"のデータと方法。あり、ラウンドロビンは完全ではありません.もより一P2.py できるこのバイアスの簡単なランダム化.

ファンから複数のP2.py's"一P3.py 少し複雑化しているだけ。この点は、古い校のアプローチを停止している方が有利になる。P3.py ニーズからの読み込み複数の名前付きパイプを使用 select 図書館へのinterleaveの読み込み.

それだけでなく、パート1に並列処理のビットを導入するだろうことが可能です。おそらくないCSVなどの単純なようですが、入力データの処理が著しく遅くなり、データの読み出しを超える場合、あなたは「行セパレータを」(見つけるまで読むことを続行し、大きな塊を読むことができる形式での問題フォーマットの読み込みに依存再びCSVの場合は改行、しかし、形式は十分に複雑である場合は、作業をしません)。

これらのチャンク、それぞれおそらく含む複数のエントリが、その後、第2段階のためにキュー上に置かれ、次いで、平行それらは解析されているキューオフジョブを読み取る工程と、分割の群衆にオフ養殖することができます。

ライセンス: CC-BY-SA帰属
所属していません StackOverflow
scroll top