Pregunta

Estoy tratando de usar la función multiprocesamiento de Pool.map () para dividir el trabajo simultáneamente. Cuando uso el siguiente código, funciona bien:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

Sin embargo, cuando lo uso en un enfoque más orientado a objetos, no funciona. El mensaje de error que da es:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Esto ocurre cuando el siguiente es mi programa principal:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

y la siguiente es mi clase someClass :

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

¿Alguien sabe cuál podría ser el problema, o una forma fácil de solucionarlo?

¿Fue útil?

Solución

El problema es que el multiprocesamiento debe enredar las cosas para colocarlas entre los procesos, y los métodos enlazados no son seleccionables. La solución (ya sea que lo considere "fácil" o no ;-) es agregar la infraestructura a su programa para permitir que dichos métodos sean encurtidos, registrándolo con copy_reg método de biblioteca estándar.

Por ejemplo, la contribución de Steven Bethard a este hilo (hacia el final del hilo) muestra un enfoque perfectamente viable para permitir el método de decapado / desempaquetado a través de copy_reg .

Otros consejos

Todas estas soluciones son feas porque el multiprocesamiento y el decapado están rotos y limitados a menos que salte fuera de la biblioteca estándar.

Si usa una bifurcación de multiprocesamiento llamada pathos.multiprocesssing , puede usar directamente clases y métodos de clase en las funciones de map de multiprocesamiento. Esto se debe a que se usa eneldo en lugar de pickle o cPickle , y dill puede serializar casi cualquier cosa en Python.

pathos.multiprocessing también proporciona una función de mapa asíncrono ... y puede map funciones con múltiples argumentos (por ejemplo, map (math.pow, [1,2 , 3], [4,5,6]) )

Ver: ¿Qué pueden hacer juntos el multiprocesamiento y el eneldo?

y: http://matthewrocklin.com/blog/work/2013/ 12/05 / Paralelismo y serialización /

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Y para ser explícito, puede hacer exactamente lo que quería hacer en primer lugar, y puede hacerlo desde el intérprete, si así lo desea.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Obtenga el código aquí:   https://github.com/uqfoundation/pathos

También puede definir un método __call __ () dentro de su someClass () , que llama a someClass.go () y luego pasa un instancia de someClass () al grupo. Este objeto es pickleable y funciona bien (para mí) ...

Algunas limitaciones para la solución de Steven Bethard:

Cuando registra su método de clase como una función, sorprendentemente se llama al destructor de su clase cada vez que finaliza el procesamiento de su método. Entonces, si tiene 1 instancia de su clase que llama n veces su método, los miembros pueden desaparecer entre 2 ejecuciones y puede recibir un mensaje malloc: *** error para el objeto 0x ...: el puntero que se está liberando no se asignó (por ejemplo, archivo de miembro abierto) o método virtual puro llamado, terminar llamado sin una excepción activa (lo que significa que la vida útil de un objeto miembro que usé fue más corta de lo que pensaba). Obtuve esto cuando se trata de n mayor que el tamaño del grupo. Aquí hay un breve ejemplo:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Salida:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

El método __call__ no es tan equivalente, porque [Ninguno, ...] se lee de los resultados:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Entonces ninguno de los dos métodos es satisfactorio ...

Hay otro atajo que puedes usar, aunque puede ser ineficiente dependiendo de lo que hay en tus instancias de clase.

Como todos han dicho, el problema es que el código multiprocessing tiene que seleccionar las cosas que envía a los subprocesos que ha iniciado, y el selector no hace métodos de instancia.

Sin embargo, en lugar de enviar el método de instancia, puede enviar la instancia de clase real, más el nombre de la función a llamar, a una función ordinaria que luego usa getattr para llamar a la instancia- , creando así el método enlazado en el subproceso Pool . Esto es similar a definir un método __call__ , excepto que puede llamar a más de una función miembro.

Robar el código de @ EricH. de su respuesta y anotarlo un poco (lo reescribí, por lo tanto, todos los cambios de nombre y demás, por alguna razón, esto parecía más fácil que cortar y pegar :-)) para ilustrar todo la magia:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

El resultado muestra que, de hecho, el constructor se llama una vez (en el pid original) y el destructor se llama 9 veces (una vez por cada copia realizada = 2 o 3 veces por proceso de trabajador de grupo según sea necesario, más una vez en el proceso original). Esto a menudo está bien, como en este caso, ya que el selector predeterminado hace una copia de toda la instancia y (semi) secretamente la vuelve a llenar & # 8212; en este caso, haciendo:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

& # 8212; es por eso que a pesar de que el destructor se llama ocho veces en los tres procesos de trabajo, cuenta regresivamente de 1 a 0 cada vez & # 8212; pero, por supuesto, aún puede meterse en problemas de esta manera. Si es necesario, puede proporcionar su propio __setstate__ :

    def __setstate__(self, adict):
        self.count = adict['count']

en este caso, por ejemplo.

También puede definir un método __call __ () dentro de su someClass () , que llama a someClass.go () y luego pasa un instancia de someClass () al grupo. Este objeto es pickleable y funciona bien (para mí) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

La solución de parisjohn anterior funciona bien conmigo. Además, el código se ve limpio y fácil de entender. En mi caso, hay algunas funciones para llamar usando Pool, por lo que modifiqué el código de parisjohn un poco más abajo. Hice call para poder llamar a varias funciones, y los nombres de las funciones se pasan en el argumento dict de go () :

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

Una solución potencialmente trivial para esto es cambiar a usar multiprocessing.dummy . Esta es una implementación basada en hilos de la interfaz de multiprocesamiento que no parece tener este problema en Python 2.7. No tengo mucha experiencia aquí, pero este cambio rápido de importación me permitió llamar a apply_async en un método de clase.

Algunos buenos recursos en multiprocessing.dummy :

https://docs.python.org/2 /library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one- línea /

En este caso simple, donde someClass.f no hereda ningún dato de la clase y no adjunta nada a la clase, una posible solución sería separar f , para que se pueda encurtir:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

¿Por qué no utilizar funciones separadas?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

Actualización: a partir del día de este escrito, las Tuplas nombradas son seleccionables (comenzando con python 2.7)

El problema aquí es que los procesos secundarios no pueden importar la clase del objeto, en este caso, la clase P-, en el caso de un proyecto de modelos múltiples, la Clase P debería ser importable en cualquier lugar del proceso secundario acostumbrarse

una solución rápida es hacer que sea importante al afectarlo a globals ()

globals()["P"] = P
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top