Pregunta

Tengo una pregunta relativa a Python multiproceso. Estoy tratando de tomar un conjunto de datos, romper en pedazos, y pasar los trozos de procesos se ejecutan simultáneamente. Necesito transformar grandes tablas de datos utilizando cálculos sencillos. (Por ejemplo, resistencia eléctrica -.> Temperatura para un termistor)

El código que aparece a continuación casi funciona como se desee, pero no parece ser el desove todos los procesos nuevos (o si lo que sólo uno a la vez). Soy nuevo en Python, por lo que es probable que haya una solución bastante sencilla a este problema.

Gracias de antemano!

from multiprocessing import Process

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = map(self.process, self.data)
        super(Worker, self).__init__()

if __name__ == '__main__':
    start = datetime.datetime.now()
    dataset = range(10000) # null dataset
    processes = 3

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))

        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        tmp = dataset[i * chunk : (i + 1) * chunk + remainder]
        exec('worker'+str(i)+' = Worker(tmp)')
        exec('worker'+str(i)+'.start()')

    for i in range(processes):
        exec('worker'+str(i)+'.join()')
        # just a placeholder to make sure the initial values of the set are as expected
        exec('print worker'+str(i)+'.result[0]')
¿Fue útil?

Solución

No hay necesidad de enviar el número de trozos a cada proceso, sólo tiene que utilizar get_nowait () y manejar la excepción Queue.Empty eventual. Cada proceso obtendrá diferentes cantidades de tiempo de CPU y esto debe mantener a todos ocupados.

import multiprocessing, Queue

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output):
        self.input = input
        self.output = output
        super(Worker, self).__init__()

    def run(self):
        try:
            while True:
                self.output.put(self.process(self.input.get_nowait()))
        except Queue.Empty:
            pass


if name == 'main':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)
    for i in range(processes):
        Worker(input, output).start()

    for i in range(len(dataset)):
        print output.get()

Otros consejos

No ha anulado el método run. Hay dos maneras con los procesos (o hilos) para tener que ejecutar código:

  1. Crea un objetivo especificando proceso
  2. subclase el proceso, reemplazando el método run.

__init__ Anulación simplemente significa que su proceso es todos vestidos con el lugar a donde ir. Se debe utilizar para darle atributos que necesita para llevar a cabo lo que necesita para llevar a cabo, pero no debe especificar la tarea a realizar.

En el código, todo el trabajo pesado se hace en esta línea:

exec('worker'+str(i)+' = Worker(tmp)')

y no se hace nada aquí:

exec('worker'+str(i)+'.start()')

Así que la comprobación de los resultados con exec('print worker'+str(i)+'.result[0]') debe darle algo significativo, pero sólo porque el código que desee ejecutar tiene ha ejecutado, pero el proceso de construcción, no en el arranque del proceso.

Prueba esto:

class Worker(Process):
    # example data transform
    def process(self, x): return (x * 2) / 3

    def __init__(self, list):
        self.data = list
        self.result = []
        super(Worker, self).__init__()

    def run(self):
        self.result = map(self.process, self.data)

EDIT:

De acuerdo ... así que sólo estaba volando en base a mis instintos de enhebrado aquí, y todos estaban equivocados. Lo que ambos no entendemos acerca de los procesos es que no se puede compartir directamente las variables. Lo que se pasa a un nuevo proceso para empezar es leer, copiar, y se ha ido para siempre. A menos que utilice una de las dos formas estándar para compartir datos: colas y tuberías . He jugado un poco un poco tratando de obtener su código de trabajo, pero hasta ahora sin suerte. Creo que le pondrá en el camino correcto.

Ok, así que parece que la lista no se hilo de seguridad, y se han trasladado a la utilización de una cola (aunque parece ser mucho más lento). Este código realiza esencialmente lo que yo estaba tratando de hacer:

import math, multiprocessing

class Worker(multiprocessing.Process):
    def process(self, x): 
        for i in range(15):
            x += (float(i) / 2.6)
        return x

    def __init__(self, input, output, chunksize):
        self.input = input
        self.output = output
        self.chunksize = chunksize
        super(Worker, self).__init__()

    def run(self):
        for x in range(self.chunksize):
            self.output.put(self.process(self.input.get()))


if __name__ == '__main__':
    dataset = range(10)
    processes = multiprocessing.cpu_count()
    input = multiprocessing.Queue()
    output = multiprocessing.Queue()

    for obj in dataset:
        input.put(obj)

    for i in range(processes):
        chunk = int(math.floor(len(dataset) / float(processes)))
        if i + 1 == processes:
            remainder = len(dataset) % processes
        else: remainder = 0

        Worker(input, output, chunk + remainder).start()

    for i in range(len(dataset)):
        print output.get()
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top