Frage

Ich versuche multiprocessing die Pool.map() Funktion zu teilen Arbeiten gleichzeitig zu verwenden. Wenn ich den folgenden Code verwenden, es funktioniert:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

Allerdings, wenn ich es in einem objektorientierten Ansatz verwenden, funktioniert es nicht. Die Fehlermeldung gibt es lautet:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Dies geschieht, wenn die folgende mein Hauptprogramm lautet:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

und folgende meine someClass Klasse:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Wer weiß, was das Problem sein könnte, oder eine einfache Möglichkeit, um es?

War es hilfreich?

Lösung

Das Problem ist, dass Multiprozessing Dinge Beize muss sie zwischen Prozessen zu schleudern, und gebundene Methoden sind nicht picklable. Die Problemumgehung (ob Sie es „easy“ betrachten oder nicht ;-) ist die Infrastruktur zu Ihrem Programm hinzuzufügen, solche Verfahren zu ermöglichen, gebeizt werden, ist es mit der copy_reg Standardbibliothek Methode.

Zum Beispiel Steven Bethard Beitrag zum < a /> (gegen Ende des Gewindes) zeigt einen perfekt bearbeitbar Ansatz Verfahren Beizen ermöglichen / via copy_reg Unpickling.

Andere Tipps

Alle diese Lösungen sind hässlich, weil Multiprozessing und Beizen gebrochen ist und begrenzt, wenn Sie außerhalb der Standard-Bibliothek springen.

Wenn Sie eine Gabel von multiprocessing genannt pathos.multiprocesssing verwenden, können Sie direkt Klassen verwenden und Klassenmethoden in map Funktionen des Multiprocessing. Dies liegt daran, dill statt pickle oder cPickle verwendet, und dill kann fast alles in Python serialisiert werden.

pathos.multiprocessing bietet auch eine asynchrone Map-Funktion ... und es kann Funktionen mit mehreren Argumenten map (z map(math.pow, [1,2,3], [4,5,6]))

Siehe auch: Was kann zusammen Multiprozessing und Dill tun?

und: http://matthewrocklin.com/blog/work/2013/ 5.12 / Parallelism-and-Serialisierung /

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Und nur explizit zu sein, können Sie genau das tun, wollen Sie in erster Linie tun wollten, und Sie können es vom Dolmetscher tun, wenn man es will.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Finden Sie den Code hier:   https://github.com/uqfoundation/pathos

Sie können auch eine __call__() Methode in Ihrem someClass() definieren, die someClass.go() aufrufen und dann eine Instanz von someClass() zum Pool passieren. Dieses Objekt ist pickleable und es funktioniert gut (für mich) ...

Einige Einschränkungen obwohl Steven Bethard Lösung:

Wenn Sie Ihre Klassenmethode als Funktion registrieren, wird der Destruktor der Klasse überraschend jedes Mal Ihre Methode Verarbeitung ist abgeschlossen genannt. Also, wenn Sie 1 Instanz der Klasse, die n-mal seine Methode aufruft, können die Mitglieder zwischen zwei Läufen verschwinden und Sie können eine Nachricht malloc: *** error for object 0x...: pointer being freed was not allocated (zB offene Teildatei) oder pure virtual method called, terminate called without an active exception (die als die Lebensdauer verwendete ich ein Mitglied Objekt bedeutet, erhalten wurde kürzer als das, was ich dachte). Ich habe diese, wenn sie mit n größer als die Poolgröße zu tun. Hier ist ein kurzes Beispiel:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Ausgabe:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

Die __call__ Methode ist nicht so gleichwertig, weil [None, ...] werden aus den Ergebnissen lesen:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

So keiner der beiden Methoden ist sehr befriedigend, ...

Es gibt einen weiteren Short-Cut Sie verwenden können, obwohl es ineffizient sein kann, je nachdem, was ist in Ihrer Klasse Instanzen.

Wie jeder gesagt hat, das Problem ist, dass der multiprocessing Code, um die Dinge zu beizen hat, dass sie zu den Teilprozessen sendet es begonnen hat, und die Beize nicht instanz Methoden tun.

Doch statt die Instanz-Methode senden, können Sie die tatsächliche Klasseninstanz sowie den Namen der Funktion aufzurufen, auf eine gewöhnliche Funktion senden, die dann getattr verwendet die Instanz-Methode aufrufen, so dass die gebundene Methode zu schaffen im Pool subprocess. Dies ist ähnlich wie ein __call__ Verfahren außer der Definition, dass Sie mehr als eine Memberfunktion aufrufen kann.

Stehlen @ ERICH. Code aus seiner Antwort und es ein bisschen mit Anmerkungen versehen (I abgetippt es daher alle Namensänderungen und solche, aus irgendeinem Grunde schien dies einfacher als Cut-and-Paste :-)) zur Veranschaulichung aller die Magie:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

Die Ausgabe zeigt, dass in der Tat ist der Konstruktor einmal (im Original pid) genannt und der destructor 9 Mal aufgerufen wird (einmal für jeden = 2 oder 3 Mal im Pool-Arbeiter-Prozess gemachte Kopie nach Bedarf, und einmal in dem ursprünglichen Prozess). Dies ist oft OK, wie in diesem Fall, da die Standard-Beize eine Kopie der gesamten Instanz macht und (semi-) heimlich wieder auffüllt es in diesem Fall zu tun:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

-Das ist, warum, obwohl die destructor achtmal in den drei Arbeitsprozessen genannt wird, zählt nach unten von 1 auf 0 jedes Mal, aber natürlich kann man immer noch in Schwierigkeiten auf diese Weise erhalten. Falls erforderlich, können Sie Ihre eigenen __setstate__ bieten:

    def __setstate__(self, adict):
        self.count = adict['count']

in diesem Fall zum Beispiel.

Sie können auch eine __call__() Methode in Ihrem someClass() definieren, die someClass.go() aufrufen und dann eine Instanz von someClass() zum Pool passieren. Dieses Objekt ist pickleable und es funktioniert gut (für mich) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

Die Lösung von parisjohn oben arbeitet mit mir in Ordnung. Und dazu der Code sieht sauber und leicht zu verstehen. In meinem Fall gibt es ein paar Funktionen mit Pool zu nennen, so dass ich modifizierte parisjohn der Code ein bisschen unten. Ich habe Anruf können mehrere Funktionen aufrufen und die Funktionsnamen werden im Argument dict von go() übergeben:

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

Eine potentiell triviale Lösung dieses Problem ist die Verwendung von multiprocessing.dummy zu wechseln. Dies ist ein Thread-basierte Implementierung der Multiprozessor-Schnittstelle, die dieses Problem nicht zu haben, in Python 2.7 scheinen. Ich habe nicht viel Erfahrung hier habe, aber die schnelle Import Änderung erlaubte mich auf einer Klassenmethode zu nennen apply_async.

Ein paar gute Ressourcen auf multiprocessing.dummy:

https://docs.python.org/2 /library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one- line /

In diesem einfachen Fall, wo someClass.f wird alle Daten aus der Klasse nicht erben und nichts an den jeweiligen Kategorie von Anteilen, eine mögliche Lösung wäre f zu trennen, so dass es gebeizt werden kann:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

Warum nicht trennen func zu benutzen?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

Update: ab dem Tag des Schreibens dieses Artikels, namedTuples aufsammelbare sind (beginnend mit Python 2.7)

Das Problem hier ist die Kindprozesse, die Klasse des Objekts zu importieren -in diesem Fall nicht in der Lage sind, die Klasse P-, im Falle eines Multi-Modell-Projekt sollte die Klasse P überall das Kind Prozess importierbar sein gewöhnungs

eine schnelle Abhilfe ist es importierbar zu machen, indem es auf Globals zu beeinflussen ()

globals()["P"] = P
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top