一个人如何使用 多处理 要解决 尴尬的平行的问题?

Embarassingly平行的问题通常由三个基本部分:

  1. 阅读 输入数据(从文件、数据库,tcp连接,等等)。
  2. 运行 计算输入数据,其中每个计算是 独立的任何其他计算.
  3. 的计算结果(文件、数据库,tcp连接,等等)。

我们可以并行的方案在两个方面:

  • 第2部分可以运行在多个内核,由于每个计算是独立的;为处理没关系。
  • 每个部分都可以独立运行的。第1部分可以将数据输入队列,第2部分可以拉出数据输入队列并把结果向输出的队列,部分和第3部分,可拉果断输出队,并把它们写出来的。

这似乎是一个最基本的模式在并行编程,但我还失去了在试图要解决它,所以 让我们写一个典型的例子来说明如何这样做是使用多处理.

这里是例的问题:给一个 CSV文件 一排排整数作为投入,计算他们的款项。单独的问题分成三个部分,这可能有平行运行:

  1. 处理输入文件成为原始数据(表/可迭代对象为整数)
  2. 计算款项的数据,在平行的
  3. 输出的款项

下面是传统的,单程约束Python程序解决这三个任务:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# basicsums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file.
"""

import csv
import optparse
import sys

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    return cli_parser


def parse_input_csv(csvfile):
    """Parses the input CSV and yields tuples with the index of the row
    as the first element, and the integers of the row as the second
    element.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.reader` instance

    """
    for i, row in enumerate(csvfile):
        row = [int(entry) for entry in row]
        yield i, row


def sum_rows(rows):
    """Yields a tuple with the index of each input list of integers
    as the first element, and the sum of the list of integers as the
    second element.

    The index is zero-index based.

    :Parameters:
    - `rows`: an iterable of tuples, with the index of the original row
      as the first element, and a list of integers as the second element

    """
    for i, row in rows:
        yield i, sum(row)


def write_results(csvfile, results):
    """Writes a series of results to an outfile, where the first column
    is the index of the original row of data, and the second column is
    the result of the calculation.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.writer` instance to which to write results
    - `results`: an iterable of tuples, with the index (zero-based) of
      the original row as the first element, and the calculated result
      from that row as the second element

    """
    for result_row in results:
        csvfile.writerow(result_row)


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)
    # gets an iterable of rows that's not yet evaluated
    input_rows = parse_input_csv(in_csvfile)
    # sends the rows iterable to sum_rows() for results iterable, but
    # still not evaluated
    result_rows = sum_rows(input_rows)
    # finally evaluation takes place as a chain in write_results()
    write_results(out_csvfile, result_rows)
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

让我们采取这个程序并改写为使用多处理并行的三个部分概述了以上。下面是一个骨架这一新的,并行程序,这需要充实,以解决部分中的评论:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)

    # Parse the input file and add the parsed data to a queue for
    # processing, possibly chunking to decrease communication between
    # processes.

    # Process the parsed data as soon as any (chunks) appear on the
    # queue, using as many processes as allotted by the user
    # (opts.numprocs); place results on a queue for output.
    #
    # Terminate processes when the parser stops putting data in the
    # input queue.

    # Write the results to disk as soon as they appear on the output
    # queue.

    # Ensure all child processes have terminated.

    # Clean up files.
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

这些代码,以及 另一块代码,可以产生如CSV文件 为测试目的,可以 上找到。.

我将感谢任何的洞察力在这里作为你如何并发大师会处理这个问题。


这里有一些问题,我已在考虑这个问题。 奖励点数用于解决所有:

  • 我应该有孩子的过程对于读取的数据,并把它变成队列,或者可以主进程这样做,没有阻止,直到所有输入是阅读?
  • 同样,我应该有一个孩子的过程编写的结果,从所处理的队列,或者可以主进程这样做,而不必等待的所有结果吗?
  • 我应该用一个 进程的游泳池 总和行动?
  • 假设我们不需要虹吸管输入和输出的队列为输入的数据,但是可以等到所有输入是分析和所有结果进行计算(例如,因为我们知道所有的输入和输出将适合的系统存储器)。我们应该改变的算法以任何方式(例如不执行任何进程的同时I/O)?
有帮助吗?

解决方案

我的解决方案有一个额外的贝尔和口哨,以确保顺序产出的顺序相同的输入。我使用多处理.的队列的发送数据之间的进程,发送停止信息,以便每个进程都知道放弃检查的队列。我认为评论该来源应该明确什么是怎么回事,但如果不让我知道。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input process sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across processes so open/close
        # and use it all in the same process or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])

其他提示

今晚的派对...

joblib 有一个层之上的多处理,以帮助作出行为的循环。它给你的设施,如一个懒惰的派遣工作,以及更好的错误报告除了非常简单的语法。

作为免责声明,我将原作者的joblib.

我意识到我有点迟到了派对的,但我最近发现的 GNU平行, ,并希望表明它是多么容易完成这一典型的任务。

cat input.csv | parallel ./sum.py --pipe > sums

像这样的东西将做为 sum.py:

#!/usr/bin/python

from sys import argv

if __name__ == '__main__':
    row = argv[-1]
    values = (int(value) for value in row.split(','))
    print row, ':', sum(values)

平行会运行 sum.py 对于每一个在线 input.csv (在并行的过程),然后输出结果 sums.显然比 multiprocessing 麻烦的

旧的学校。

p1.py

import csv
import pickle
import sys

with open( "someFile", "rb" ) as source:
    rdr = csv.reader( source )
    for line in eumerate( rdr ):
        pickle.dump( line, sys.stdout )

p2.py

import pickle
import sys

while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    pickle.dump( i, sum(row) )

p3.py

import pickle
import sys
while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    print i, row

这里是多处理的最终结构。

python p1.py | python p2.py | python p3.py

是的,壳牌的针织的这些一起在操作系统的水平。它似乎更为简单,我和它的工作非常漂亮。

是的,还有略微更多的开销在使用坐(或cPickle).简化,但是,似乎值得努力。

如果你想要的文件是一个参数 p1.py, 那是一个简单的改变。

更重要的是,一个功能,如以下是非常方便。

def get_stdin():
    while True:
        try:
            yield pickle.load( sys.stdin )
        except EOFError:
            return

这可以让你这样做:

for item in get_stdin():
     process item

这是非常简单,但它没有 很容易 让你有多个副本P2.py 运行。

你有两个问题:扇出和风扇。的P1.py 必须以某种方式电风扇出多P2.py's。和P2.py's必须以某种方式合并,它们的结果成一个单一的P3.py.

老派方法扇出了一个"推"的架构,这是非常有效的。

从理论上讲,多P2.py's拉从一个共同的队列为最佳分配资源。这是常理想的,但它也是一个相当数量的节目。被编程的必要吗?或将循环处理是不够好?

实际上,你会发现制P1.py 做一个简单的"循环"处理其中的多P2.py's可以是相当好的。你会有P1.py 配置处理, n 副本P2.py 通过命名管道。的P2.py's每个会读他们的适当管。

如果一个P2.py 得到所有的"最坏情况"的数据和运行方式的背后?是的,循环不是完美的。但它是更好的比唯一一个P2.py 你可以解决这种偏见有简单的随机化。

风扇从多P2.py's的一个P3.py 是一个比较复杂,仍然。在这一点上,老学校的做法停止,正在有利的。P3.py 需要从多个命名管用 select 库来交织的内容。

它是大概能够介绍一位的并行成部分1。可能不是一个问题与格式,简单的如CSV,但是如果处理的输入数据是明显低于读取的数据,你可以读取较大的区块,然后继续读,直到你找到一个"行分离"(newline在CSV的情况,但再次,取决于格式阅读;不会工作如果格式是充分复杂的).

这些区块,每个可能含有多个条目,然后可以养殖一人群的平行进程阅读作业掉的队列,在那里他们却分析和分裂,然后放在排队于2阶段。

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top