Non è possibile decapitare < digitare 'instancemethod' > quando si utilizza multiprocessing Pool.map ()
-
08-07-2019 - |
Domanda
Sto cercando di usare la funzione Pool.map ()
di multiprocessing
per dividere il lavoro contemporaneamente. Quando uso il seguente codice, funziona perfettamente:
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
Tuttavia, quando lo uso in un approccio più orientato agli oggetti, non funziona. Il messaggio di errore che fornisce è:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
Questo si verifica quando il seguente è il mio programma principale:
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
e la seguente è la mia classe someClass
:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
Qualcuno sa quale potrebbe essere il problema o un modo semplice per risolverlo?
Soluzione
Il problema è che il multiprocessing deve mettere in ordine le cose per metterle tra i processi e i metodi associati non sono selezionabili. La soluzione alternativa (che tu lo consideri "facile" o meno ;-) è aggiungere l'infrastruttura al tuo programma per consentire il decapaggio di tali metodi, registrandolo con copy_reg metodo di libreria standard.
Ad esempio, il contributo di Steven Bethard a questa discussione (verso la fine della discussione) mostra un approccio perfettamente praticabile per consentire il decapaggio / disimballaggio del metodo tramite copy_reg
.
Altri suggerimenti
Tutte queste soluzioni sono brutte perché il multiprocessing e il decapaggio sono interrotti e limitati a meno che non si salti fuori dalla libreria standard.
Se usi un fork di multiprocessing
chiamato pathos.multiprocesssing
, puoi utilizzare direttamente le classi e i metodi di classe nelle funzioni map
del multiprocessing. Questo perché dill
viene utilizzato al posto di pickle
o cPickle
e dill
può serializzare quasi tutto in Python.
pathos.multiprocessing
fornisce anche una funzione di mappa asincrona & # 8230; e può map
funzioni con più argomenti (ad es. map (math.pow, [1,2,3], [4,5,6])
)
Vedi: Cosa possono fare insieme multiprocessing e aneto?
e: http://matthewrocklin.com/blog/work/2013/ 05/12 / Parallelismo-e-serializzazione /
>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
E solo per essere espliciti, puoi fare esattamente quello che volevi fare in primo luogo, e puoi farlo dall'interprete, se lo desideri.
>>> import pathos.pools as pp
>>> class someClass(object):
... def __init__(self):
... pass
... def f(self, x):
... return x*x
... def go(self):
... pool = pp.ProcessPool(4)
... print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
Ottieni il codice qui: https://github.com/uqfoundation/pathos
Puoi anche definire un metodo __call __ ()
all'interno del tuo someClass ()
, che chiama someClass.go ()
e quindi passa un istanza di someClass ()
al pool. Questo oggetto è selezionabile e funziona benissimo (per me) ...
Alcune limitazioni alla soluzione di Steven Bethard:
Quando registri il tuo metodo di classe come funzione, il distruttore della tua classe viene sorprendentemente chiamato ogni volta che l'elaborazione del metodo è terminata. Quindi se hai 1 istanza della tua classe che chiama n volte il suo metodo, i membri potrebbero scomparire tra 2 esecuzioni e potresti ricevere un messaggio malloc: *** errore per l'oggetto 0x ...: il puntatore da liberare non è stato allocato
(ad es. file open member) o metodo virtuale puro chiamato,
termina chiamato senza un'eccezione attiva
(il che significa che la durata di un oggetto membro che ho usato era più breve di quello che pensavo). Ho ottenuto questo quando ho a che fare con n maggiore della dimensione del pool. Ecco un breve esempio:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multi-processing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __del__(self):
print "... Destructor"
def process_obj(self, index):
print "object %d" % index
return "results"
pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)
Output:
Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor
Il metodo __call__
non è così equivalente, perché [None, ...] viene letto dai risultati:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multiprocessing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __call__(self, i):
self.process_obj(i)
def __del__(self):
print "... Destructor"
def process_obj(self, i):
print "obj %d" % i
return "result"
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once),
# **and** results are empty !
Quindi nessuno dei due metodi è soddisfacente ...
C'è un'altra scorciatoia che puoi usare, sebbene possa essere inefficiente a seconda di cosa c'è nelle tue istanze di classe.
Come tutti hanno detto, il problema è che il codice multiprocessing
deve decapare le cose che invia ai processi secondari che ha avviato e il pickler non esegue metodi di istanza.
Tuttavia, invece di inviare il metodo di istanza, puoi inviare l'istanza di classe effettiva, oltre al nome della funzione da chiamare, a una funzione ordinaria che quindi utilizza getattr
per chiamare l'istanza- , creando così il metodo associato nel sottoprocesso Pool
. Ciò è simile alla definizione di un metodo __call__
, tranne per il fatto che è possibile chiamare più di una funzione membro.
Rubando il codice di @ EricH. dalla sua risposta e annotandolo un po '(l'ho riscritto, quindi tutti i nomi cambiano e così, per qualche motivo questo sembra più facile del taglia e incolla :-)) per l'illustrazione di tutto la magia:
import multiprocessing
import os
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class Klass(object):
def __init__(self, nobj, workers=multiprocessing.cpu_count()):
print "Constructor (in pid=%d)..." % os.getpid()
self.count = 1
pool = multiprocessing.Pool(processes = workers)
async_results = [pool.apply_async(call_it,
args = (self, 'process_obj', (i,))) for i in range(nobj)]
pool.close()
map(multiprocessing.pool.ApplyResult.wait, async_results)
lst_results = [r.get() for r in async_results]
print lst_results
def __del__(self):
self.count -= 1
print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
def process_obj(self, index):
print "object %d" % index
return "results"
Klass(nobj=8, workers=3)
L'output mostra che, in effetti, il costruttore viene chiamato una volta (nel pid originale) e il distruttore viene chiamato 9 volte (una volta per ogni copia effettuata = 2 o 3 volte per processo di pool-worker secondo necessità, più una volta nel processo originale). Questo è spesso OK, come in questo caso, poiché il pickler predefinito crea una copia dell'intera istanza e (semi) ripopola segretamente & # 8212; in questo caso, facendo:
obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
& # 8212; ecco perché anche se il distruttore viene chiamato otto volte nei tre processi di lavoro, conta ogni volta da 1 a 0 & # 8212; ma ovviamente puoi comunque metterti nei guai in questo modo. Se necessario, puoi fornire il tuo __setstate__
:
def __setstate__(self, adict):
self.count = adict['count']
in questo caso ad esempio.
Puoi anche definire un metodo __call __ ()
all'interno del tuo someClass ()
, che chiama someClass.go ()
e quindi passa un istanza di someClass ()
al pool. Questo oggetto è selezionabile e funziona benissimo (per me) ...
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
p = Pool(4)
sc = p.map(self, range(4))
print sc
def __call__(self, x):
return self.f(x)
sc = someClass()
sc.go()
La soluzione di parisjohn funziona perfettamente con me. Inoltre il codice sembra pulito e di facile comprensione. Nel mio caso ci sono alcune funzioni da chiamare usando Pool, quindi ho modificato un po 'il codice di Parigi. Ho fatto chiama per essere in grado di chiamare diverse funzioni e i nomi delle funzioni sono passati nell'argomento dict da go ()
:
from multiprocessing import Pool
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def g(self, x):
return x*x+1
def go(self):
p = Pool(4)
sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
print sc
def __call__(self, x):
if x["func"]=="f":
return self.f(x["v"])
if x["func"]=="g":
return self.g(x["v"])
sc = someClass()
sc.go()
Una soluzione potenzialmente banale a questo è passare all'utilizzo di multiprocessing.dummy
. Questa è un'implementazione basata su thread dell'interfaccia multiprocessing che non sembra avere questo problema in Python 2.7. Non ho molta esperienza qui, ma questa rapida modifica dell'importazione mi ha permesso di chiamare apply_async con un metodo di classe.
Alcune buone risorse su multiprocessing.dummy
:
https://docs.python.org/2 /library/multiprocessing.html#module-multiprocessing.dummy
In questo semplice caso, in cui someClass.f
non eredita alcun dato dalla classe e non allega nulla alla classe, una possibile soluzione sarebbe quella di separare f
, quindi può essere decapato:
import multiprocessing
def f(x):
return x*x
class someClass(object):
def __init__(self):
pass
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
Perché non usare funzioni separate?
def func(*args, **kwargs):
return inst.method(args, kwargs)
print pool.map(func, arr)
Aggiornamento: a partire dal giorno in cui scrivo, le coppie nominate sono selezionabili (a partire da Python 2.7)
Il problema qui è che i processi figlio non sono in grado di importare la classe dell'oggetto-in questo caso, la classe P-, nel caso di un progetto multi-modello la Classe P dovrebbe essere importabile ovunque il processo figlio abituati
una soluzione rapida è renderlo improprio interessandolo ai globali ()
globals()["P"] = P