Domanda

Sto cercando di usare la funzione Pool.map () di multiprocessing per dividere il lavoro contemporaneamente. Quando uso il seguente codice, funziona perfettamente:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

Tuttavia, quando lo uso in un approccio più orientato agli oggetti, non funziona. Il messaggio di errore che fornisce è:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Questo si verifica quando il seguente è il mio programma principale:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

e la seguente è la mia classe someClass :

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Qualcuno sa quale potrebbe essere il problema o un modo semplice per risolverlo?

È stato utile?

Soluzione

Il problema è che il multiprocessing deve mettere in ordine le cose per metterle tra i processi e i metodi associati non sono selezionabili. La soluzione alternativa (che tu lo consideri "facile" o meno ;-) è aggiungere l'infrastruttura al tuo programma per consentire il decapaggio di tali metodi, registrandolo con copy_reg metodo di libreria standard.

Ad esempio, il contributo di Steven Bethard a questa discussione (verso la fine della discussione) mostra un approccio perfettamente praticabile per consentire il decapaggio / disimballaggio del metodo tramite copy_reg .

Altri suggerimenti

Tutte queste soluzioni sono brutte perché il multiprocessing e il decapaggio sono interrotti e limitati a meno che non si salti fuori dalla libreria standard.

Se usi un fork di multiprocessing chiamato pathos.multiprocesssing , puoi utilizzare direttamente le classi e i metodi di classe nelle funzioni map del multiprocessing. Questo perché dill viene utilizzato al posto di pickle o cPickle e dill può serializzare quasi tutto in Python.

pathos.multiprocessing fornisce anche una funzione di mappa asincrona & # 8230; e può map funzioni con più argomenti (ad es. map (math.pow, [1,2,3], [4,5,6]) )

Vedi: Cosa possono fare insieme multiprocessing e aneto?

e: http://matthewrocklin.com/blog/work/2013/ 05/12 / Parallelismo-e-serializzazione /

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

E solo per essere espliciti, puoi fare esattamente quello che volevi fare in primo luogo, e puoi farlo dall'interprete, se lo desideri.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Ottieni il codice qui:   https://github.com/uqfoundation/pathos

Puoi anche definire un metodo __call __ () all'interno del tuo someClass () , che chiama someClass.go () e quindi passa un istanza di someClass () al pool. Questo oggetto è selezionabile e funziona benissimo (per me) ...

Alcune limitazioni alla soluzione di Steven Bethard:

Quando registri il tuo metodo di classe come funzione, il distruttore della tua classe viene sorprendentemente chiamato ogni volta che l'elaborazione del metodo è terminata. Quindi se hai 1 istanza della tua classe che chiama n volte il suo metodo, i membri potrebbero scomparire tra 2 esecuzioni e potresti ricevere un messaggio malloc: *** errore per l'oggetto 0x ...: il puntatore da liberare non è stato allocato (ad es. file open member) o metodo virtuale puro chiamato, termina chiamato senza un'eccezione attiva (il che significa che la durata di un oggetto membro che ho usato era più breve di quello che pensavo). Ho ottenuto questo quando ho a che fare con n maggiore della dimensione del pool. Ecco un breve esempio:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Output:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

Il metodo __call__ non è così equivalente, perché [None, ...] viene letto dai risultati:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

Quindi nessuno dei due metodi è soddisfacente ...

C'è un'altra scorciatoia che puoi usare, sebbene possa essere inefficiente a seconda di cosa c'è nelle tue istanze di classe.

Come tutti hanno detto, il problema è che il codice multiprocessing deve decapare le cose che invia ai processi secondari che ha avviato e il pickler non esegue metodi di istanza.

Tuttavia, invece di inviare il metodo di istanza, puoi inviare l'istanza di classe effettiva, oltre al nome della funzione da chiamare, a una funzione ordinaria che quindi utilizza getattr per chiamare l'istanza- , creando così il metodo associato nel sottoprocesso Pool . Ciò è simile alla definizione di un metodo __call__ , tranne per il fatto che è possibile chiamare più di una funzione membro.

Rubando il codice di @ EricH. dalla sua risposta e annotandolo un po '(l'ho riscritto, quindi tutti i nomi cambiano e così, per qualche motivo questo sembra più facile del taglia e incolla :-)) per l'illustrazione di tutto la magia:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

L'output mostra che, in effetti, il costruttore viene chiamato una volta (nel pid originale) e il distruttore viene chiamato 9 volte (una volta per ogni copia effettuata = 2 o 3 volte per processo di pool-worker secondo necessità, più una volta nel processo originale). Questo è spesso OK, come in questo caso, poiché il pickler predefinito crea una copia dell'intera istanza e (semi) ripopola segretamente & # 8212; in questo caso, facendo:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

& # 8212; ecco perché anche se il distruttore viene chiamato otto volte nei tre processi di lavoro, conta ogni volta da 1 a 0 & # 8212; ma ovviamente puoi comunque metterti nei guai in questo modo. Se necessario, puoi fornire il tuo __setstate__ :

    def __setstate__(self, adict):
        self.count = adict['count']

in questo caso ad esempio.

Puoi anche definire un metodo __call __ () all'interno del tuo someClass () , che chiama someClass.go () e quindi passa un istanza di someClass () al pool. Questo oggetto è selezionabile e funziona benissimo (per me) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

La soluzione di parisjohn funziona perfettamente con me. Inoltre il codice sembra pulito e di facile comprensione. Nel mio caso ci sono alcune funzioni da chiamare usando Pool, quindi ho modificato un po 'il codice di Parigi. Ho fatto chiama per essere in grado di chiamare diverse funzioni e i nomi delle funzioni sono passati nell'argomento dict da go () :

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

Una soluzione potenzialmente banale a questo è passare all'utilizzo di multiprocessing.dummy . Questa è un'implementazione basata su thread dell'interfaccia multiprocessing che non sembra avere questo problema in Python 2.7. Non ho molta esperienza qui, ma questa rapida modifica dell'importazione mi ha permesso di chiamare apply_async con un metodo di classe.

Alcune buone risorse su multiprocessing.dummy :

https://docs.python.org/2 /library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one- linea /

In questo semplice caso, in cui someClass.f non eredita alcun dato dalla classe e non allega nulla alla classe, una possibile soluzione sarebbe quella di separare f , quindi può essere decapato:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

Perché non usare funzioni separate?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

Aggiornamento: a partire dal giorno in cui scrivo, le coppie nominate sono selezionabili (a partire da Python 2.7)

Il problema qui è che i processi figlio non sono in grado di importare la classe dell'oggetto-in questo caso, la classe P-, nel caso di un progetto multi-modello la Classe P dovrebbe essere importabile ovunque il processo figlio abituati

una soluzione rapida è renderlo improprio interessandolo ai globali ()

globals()["P"] = P
Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top