我正在尝试使用 multiprocessingPool.map() 同时分配工作的功能。当我使用以下代码时,它工作正常:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

但是,当我以更面向对象的方法使用它时,它不起作用。它给出的错误信息是:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

当我的主程序如下时会发生这种情况:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

以下是我的 someClass 班级:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

有人知道问题可能是什么,或者有简单的解决方法吗?

有帮助吗?

解决方案

问题是多处理必须腌制东西以在进程之间吊索它们,并且绑定方法不是可选择的。解决方法(无论你认为它是否“简单”;-)是将基础结构添加到程序中以允许这些方法被腌制,并使用 copy_reg 标准库方法。

例如,Steven Bethard对此主题<的贡献(在线程的末尾)显示了一个完美可行的方法,允许通过 copy_reg 进行方法pickle / unpickling。

其他提示

所有这些解决方案都很丑陋,因为除非您跳出标准库,否则多重处理和酸洗会被破坏和限制。

如果你使用叉子 multiprocessing 被称为 pathos.multiprocesssing, ,您可以直接在多处理中使用类和类方法 map 功能。这是因为 dill 被用来代替 pickle 或者 cPickle, , 和 dill 几乎可以序列化Python中的任何东西。

pathos.multiprocessing 还提供了异步映射功能......并且它可以 map 具有多个参数的函数(例如 map(math.pow, [1,2,3], [4,5,6]))

看:multiprocessing 和 dill 可以一起做什么?

和:http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

明确地说,您可以完全按照您一开始就想做的事情进行,并且如果您愿意,也可以通过解释器来完成。

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

在这里获取代码: https://github.com/uqfoundation/pathos

您还可以在 someClass()中定义 __ call __()方法,该方法调用 someClass.go()然后传递一个池中的 someClass()的实例。这个对象是pickleable,它工作正常(对我来说)......

Steven Bethard的解决方案有一些限制:

当您将类方法注册为函数时,每次方法处理完成时,都会令人惊讶地调用类的析构函数。因此,如果您的类的一个实例调用其方法的n倍,则成员可能会在两次运行之间消失,并且您可能会收到消息 malloc:***对象0x的错误...:未释放指针被释放(例如打开成员文件)或纯虚方法调用, 在没有活动异常的情况下调用终止(这意味着我使用的成员对象的生命周期比我想象的要短)。当处理大于池大小的n时,我得到了这个。这是一个简短的例子:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

输出:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

__ call __ 方法不等同,因为从结果中读取了[None,...]:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

所以这两种方法都不令人满意......

您可以使用另一个捷径,但根据您的班级实例中的内容,它可能效率低下。

正如大家所说的那样,问题在于 multiprocessing 代码必须挑选它发送给它已启动的子进程的东西,并且pickler不执行实例方法。 / p>

但是,您可以将实际的类实例以及要调用的函数的名称发送给普通函数,然后使用 getattr 来调用实例 - 而不是发送实例方法。方法,从而在 Pool 子进程中创建绑定方法。这类似于定义 __ call __ 方法,除了您可以调用多个成员函数。

从他的回答中窃取了@ EricH。的代码并对其进行了一些注释(我重新输入了所有的名称更改,因此出于某些原因,这似乎比剪切和粘贴更容易:-))用于说明所有魔术:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

输出显示,实际上,构造函数被调用一次(在原始pid中)并且析构函数被调用9次(每个副本一次调用=每个pool-worker-process需要2或3次,加上一次在原始过程中)。这通常是正常的,因为在这种情况下,由于默认选择器生成整个实例的副本并且(半)秘密地重新填充它&#8212;在这种情况下,执行:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

&#8212;这就是为什么即使析构函数在三个工作进程中被调用了八次,它每次都会从1倒数到0,但当然你仍然可以通过这种方式遇到麻烦。如有必要,您可以提供自己的 __ setstate __

    def __setstate__(self, adict):
        self.count = adict['count']
例如,在这种情况下

您还可以在 someClass()中定义 __ call __()方法,该方法调用 someClass.go()然后传递一个池中的 someClass()的实例。这个对象是pickleable,它工作正常(对我来说)......

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

上面 parisjohn 的解决方案对我来说很好。此外,代码看起来干净,易于理解。在我的情况下,有一些函数可以使用Pool调用,所以我在下面修改了parisjohn的代码。我使调用能够调用多个函数,函数名称在 go()的参数dict中传递:

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

对此的一个潜在的简单解决方案是切换到使用 multiprocessing.dummy 。这是多处理接口的基于线程的实现,在Python 2.7中似乎没有这个问题。我在这里没有很多经验,但是这个快速导入更改允许我在类方法上调用apply_async。

multiprocessing.dummy 上的一些好资源:

https://docs.python.org/2 /library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-线/

在这个简单的例子中, someClass.f 不继承类中的任何数据而不将任何内容附加到类中,可能的解决方案是将 f ,所以可以腌制:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

为什么不使用单独的func?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

更新:截至本文撰写之日,namedTuples可以选择(从python 2.7开始)

这里的问题是子进程无法导入对象的类 - 在这种情况下,类P-,在多模型项目的情况下,类P应该可以在子进程的任何地方导入习惯了

快速解决方法是通过将其影响为globals()

使其可导入
globals()["P"] = P
许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top