Pregunta

¿Cómo hace un uso multiprocesamiento para hacer frente a problemas vergonzosamente paralelas ?

problemas embarazosamente paralelos típicamente constan de tres partes básicas:

  1. Leer los datos de entrada (a partir de un archivo, base de datos, conexión TCP, etc.).
  2. Ejecutar los cálculos sobre los datos de entrada, donde cada cálculo es independiente de cualquier otro cálculo .
  3. Comentario resultados de los cálculos (en un archivo, base de datos, conexión TCP, etc.).

puede paralelizar el programa en dos dimensiones:

  • Parte 2 puede ejecutarse en múltiples núcleos, ya que cada cálculo es independiente; orden de procesamiento no importa.
  • Cada parte puede funcionar de forma independiente. Parte 1 puede colocar datos en una cola de entrada, parte 2 puede tirar de datos fuera de la cola de entrada y los resultados de poner en una cola de salida, y la parte 3 puede tirar de los resultados de la cola de salida y escribir a cabo.

Esto parece un patrón más básico en la programación concurrente, pero todavía estoy perdido en tratar de resolverlo, por lo que Vamos a escribir un ejemplo canónico para ilustrar cómo se hace esto usando multiprocesamiento .

Aquí está el problema de ejemplo: Dado un archivo CSV con filas de números enteros como entrada , calcular sus sumas. Separar el problema en tres partes, que pueden correr todos en paralelo:

  1. Procesar el archivo de entrada en datos en bruto (listas / iterables de números enteros)
  2. Calcular las sumas de los datos, en paralelo
  3. Salida las sumas

A continuación el programa Python está obligado, de un solo proceso tradicional que resuelve estas tres tareas:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# basicsums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file.
"""

import csv
import optparse
import sys

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    return cli_parser


def parse_input_csv(csvfile):
    """Parses the input CSV and yields tuples with the index of the row
    as the first element, and the integers of the row as the second
    element.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.reader` instance

    """
    for i, row in enumerate(csvfile):
        row = [int(entry) for entry in row]
        yield i, row


def sum_rows(rows):
    """Yields a tuple with the index of each input list of integers
    as the first element, and the sum of the list of integers as the
    second element.

    The index is zero-index based.

    :Parameters:
    - `rows`: an iterable of tuples, with the index of the original row
      as the first element, and a list of integers as the second element

    """
    for i, row in rows:
        yield i, sum(row)


def write_results(csvfile, results):
    """Writes a series of results to an outfile, where the first column
    is the index of the original row of data, and the second column is
    the result of the calculation.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.writer` instance to which to write results
    - `results`: an iterable of tuples, with the index (zero-based) of
      the original row as the first element, and the calculated result
      from that row as the second element

    """
    for result_row in results:
        csvfile.writerow(result_row)


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)
    # gets an iterable of rows that's not yet evaluated
    input_rows = parse_input_csv(in_csvfile)
    # sends the rows iterable to sum_rows() for results iterable, but
    # still not evaluated
    result_rows = sum_rows(input_rows)
    # finally evaluation takes place as a chain in write_results()
    write_results(out_csvfile, result_rows)
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

Vamos a tomar este programa y volver a escribir para que use multiprocesamiento para paralelizar las tres partes mencionadas anteriormente. A continuación se muestra un esqueleto de este nuevo programa, parallelized, que las necesidades de dar cuerpo a hacer frente a las partes en los comentarios:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)

    # Parse the input file and add the parsed data to a queue for
    # processing, possibly chunking to decrease communication between
    # processes.

    # Process the parsed data as soon as any (chunks) appear on the
    # queue, using as many processes as allotted by the user
    # (opts.numprocs); place results on a queue for output.
    #
    # Terminate processes when the parser stops putting data in the
    # input queue.

    # Write the results to disk as soon as they appear on the output
    # queue.

    # Ensure all child processes have terminated.

    # Clean up files.
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

Estas piezas de código, así como otra pieza de código que puede generar ejemplo Los archivos CSV para propósitos de prueba, puede ser encontrado en github .

Le agradecería cualquier idea de aquí en cuanto a cómo los gurús de concurrencia se acercaría a este problema.


Aquí están algunas preguntas que tenía al pensar en este problema Los puntos de bonificación para hacer frente a cualquier / todo:.

  • ¿Debo tener procesos secundarios para la lectura de los datos y de colocarlo en la cola, o puedo hacer esto sin el bloqueo hasta que se lea todo el proceso de entrada principal?
  • Del mismo modo, debería tener un proceso hijo para escribir los resultados a partir de la cola de procesado, o puedo hacer esto sin tener que esperar a que todos los resultados del proceso principal?
  • ¿Debería usar un procesos piscina para las operaciones de suma ?
  • Supongamos que no había necesidad de desviar las colas de entrada y salida de datos como los escriba, pero podría esperar hasta que todas las entradas se analiza y se calcularon todos los resultados (por ejemplo, porque sabemos que toda la entrada y la salida se ajuste en el sistema de memoria). ¿Hay que cambiar el algoritmo de cualquier manera (por ejemplo, no ejecutar cualquier proceso simultáneamente con I / O)?
¿Fue útil?

Solución

Mi solución tiene un timbre adicional y un silbato para asegurarse de que el orden de la salida tiene el mismo que el orden de la entrada. Yo uso de multiprocessing.queue para enviar datos entre los procesos, el envío de mensajes de detención por lo que cada proceso de dejar de fumar sabe comprobar las colas. Creo que los comentarios en la fuente debe dejar claro lo que está pasando, pero si no me deja saber.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums\
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser

class CSVWorker(object):
    def __init__(self, numprocs, infile, outfile):
        self.numprocs = numprocs
        self.infile = open(infile)
        self.outfile = outfile
        self.in_csvfile = csv.reader(self.infile)
        self.inq = multiprocessing.Queue()
        self.outq = multiprocessing.Queue()

        self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
        self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
        self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                        for i in range(self.numprocs)]

        self.pin.start()
        self.pout.start()
        for p in self.ps:
            p.start()

        self.pin.join()
        i = 0
        for p in self.ps:
            p.join()
            print "Done", i
            i += 1

        self.pout.join()
        self.infile.close()

    def parse_input_csv(self):
            """Parses the input CSV and yields tuples with the index of the row
            as the first element, and the integers of the row as the second
            element.

            The index is zero-index based.

            The data is then sent over inqueue for the workers to do their
            thing.  At the end the input process sends a 'STOP' message for each
            worker.
            """
            for i, row in enumerate(self.in_csvfile):
                row = [ int(entry) for entry in row ]
                self.inq.put( (i, row) )

            for i in range(self.numprocs):
                self.inq.put("STOP")

    def sum_row(self):
        """
        Workers. Consume inq and produce answers on outq
        """
        tot = 0
        for i, row in iter(self.inq.get, "STOP"):
                self.outq.put( (i, sum(row)) )
        self.outq.put("STOP")

    def write_output_csv(self):
        """
        Open outgoing csv file then start reading outq for answers
        Since I chose to make sure output was synchronized to the input there
        is some extra goodies to do that.

        Obviously your input has the original row number so this is not
        required.
        """
        cur = 0
        stop = 0
        buffer = {}
        # For some reason csv.writer works badly across processes so open/close
        # and use it all in the same process or else you'll have the last
        # several rows missing
        outfile = open(self.outfile, "w")
        self.out_csvfile = csv.writer(outfile)

        #Keep running until we see numprocs STOP messages
        for works in range(self.numprocs):
            for i, val in iter(self.outq.get, "STOP"):
                # verify rows are in order, if not save in buffer
                if i != cur:
                    buffer[i] = val
                else:
                    #if yes are write it out and make sure no waiting rows exist
                    self.out_csvfile.writerow( [i, val] )
                    cur += 1
                    while cur in buffer:
                        self.out_csvfile.writerow([ cur, buffer[cur] ])
                        del buffer[cur]
                        cur += 1

        outfile.close()

def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")

    c = CSVWorker(opts.numprocs, args[0], args[1])

if __name__ == '__main__':
    main(sys.argv[1:])

Otros consejos

El venir tarde a la fiesta ...

JOBLIB tiene una capa en la parte superior de multiprocesamiento para ayuda para hacer paralelo para bucles. Se le da facilidades como un despacho perezoso de puestos de trabajo, y mejor información de errores, además de su sintaxis muy sencilla.

Como un descargo de responsabilidad, yo soy el autor original de JOBLIB.

Me doy cuenta de que estoy un poco tarde a la fiesta, pero recientemente he descubierto GNU paralelo , y quieren mostrar lo fácil que es llevar a cabo esta tarea típica con él.

cat input.csv | parallel ./sum.py --pipe > sums

Algo como esto va a hacer por sum.py:

#!/usr/bin/python

from sys import argv

if __name__ == '__main__':
    row = argv[-1]
    values = (int(value) for value in row.split(','))
    print row, ':', sum(values)

Parallel se ejecutará sum.py por cada línea en input.csv (en paralelo, por supuesto), entonces la salida de los resultados a sums. Claramente mejor que molestia multiprocessing

Old School.

p1.py

import csv
import pickle
import sys

with open( "someFile", "rb" ) as source:
    rdr = csv.reader( source )
    for line in eumerate( rdr ):
        pickle.dump( line, sys.stdout )

p2.py

import pickle
import sys

while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    pickle.dump( i, sum(row) )

p3.py

import pickle
import sys
while True:
    try:
        i, row = pickle.load( sys.stdin )
    except EOFError:
        break
    print i, row

Este es el procesamiento de múltiples-estructura final.

python p1.py | python p2.py | python p3.py

Sí, la cáscara ha unido estos juntos a nivel de sistema operativo. Parece más sencillo para mí y funciona muy bien.

Sí, hay un poco más de riesgo en el uso de salmuera (o cPickle). La simplificación, sin embargo, parece vale la pena el esfuerzo.

Si desea que el nombre de archivo para ser un argumento a p1.py, que es un cambio fácil.

Más importante aún, una función como la siguiente es muy práctico.

def get_stdin():
    while True:
        try:
            yield pickle.load( sys.stdin )
        except EOFError:
            return

Esto le permite hacer esto:

for item in get_stdin():
     process item

Esto es muy simple, pero no lo hace fácilmente le permiten tener múltiples copias de P2.py en ejecución.

Existen dos problemas: fan-out y el ventilador de entrada. El P1.py debe de alguna manera se abren en abanico a múltiples P2.py de. Y la necesidad de la P2.py alguna manera fusionar sus resultados en una sola P3.py.

El enfoque de la vieja escuela a fan-out es una arquitectura "push", que es muy eficaz.

En teoría, múltiple de tirar de una cola común es la asignación óptima de los recursos P2.py. Esto a menudo es ideal, pero también es una buena cantidad de programación. Es la programación realmente necesario? ¿O round-robin procesamiento ser lo suficientemente bueno?

En la práctica, se puede encontrar que hacer P1.py hacer un simple "round robin" tratar los múltiples P2.py de puede ser bastante bueno. Tendrías P1.py configurado para hacer frente a n copias de P2.py a través de canalizaciones con nombre. El P2.py de cada habría leído de su tubería apropiada.

¿Qué pasa si uno P2.py recibe toda la información del "caso peor" y corre muy por detrás? Sí, round-robin no es perfecto. Pero es mejor que sólo un P2.py y se puede abordar este sesgo con una simple asignación al azar.

abanico de entrada de múltiples P2.py de uno a P3.py es un poco más compleja, todavía. En este punto, el enfoque de la vieja escuela deja de ser ventajosa. P3.py necesita leer desde múltiples canalizaciones con nombre utilizando la biblioteca select para intercalar las lecturas.

Es probable que sea posible introducir un poco de paralelismo en la parte 1 también. Probablemente no es un problema con un formato que es tan simple como CSV, pero si el procesamiento de los datos de entrada es notablemente más lento que la lectura de los datos, se podía leer trozos más grandes, y luego continuar a leer hasta que encuentre un "separador de fila" ( salto de línea en el caso CSV, pero de nuevo que depende del formato de lectura; no funciona si el formato es lo suficientemente complejo).

Estos trozos, cada uno, probablemente contienen varias entradas, a continuación, puede ser cultivada fuera a una multitud de procesos paralelos de lectura puestos de trabajo fuera de una cola, donde son analizados y se dividió, luego se coloca en la cola-in para la etapa 2.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top