문제

사용하려고합니다 multiprocessing'에스 Pool.map() 동시에 작업을 나누는 기능. 다음 코드를 사용하면 잘 작동합니다.

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

그러나보다 객체 지향적 인 접근 방식으로 사용하면 작동하지 않습니다. 그것이주는 오류 메시지는 다음과 같습니다.

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

이것은 다음이 나의 주요 프로그램 인 경우에 발생합니다.

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

그리고 다음은 나의 것입니다 someClass 수업:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

누구든지 문제가 무엇인지, 또는 그 주위에 쉬운 방법을 알고 있습니까?

도움이 되었습니까?

해결책

문제는 멀티 프로세싱이 프로세스 중에서 그들을 슬링하기 위해 물건을 피우고 바인딩 방법이 선택할 수 없다는 것입니다. 해결 방법 ( "쉬운"여부에 관계없이 ;-)는 프로그램에 인프라를 추가하여 그러한 방법을 절인 할 수 있도록하는 것입니다. COPY_REG 표준 라이브러리 방법.

예를 들어, Steven Bethard의 기여 이 스레드 (스레드의 끝을 향하여) 방법 절도/피킹을 통해 완벽하게 실행 가능한 하나의 접근 방식을 보여줍니다. copy_reg.

다른 팁

표준 라이브러리 밖으로 뛰어 내리지 않으면 멀티 프로세싱 및 산세가 깨지고 제한되어 있기 때문에 이러한 모든 솔루션은 추악합니다.

포크를 사용하는 경우 multiprocessing ~라고 불리는 pathos.multiprocesssing, 멀티 프로세싱에서 클래스와 클래스 방법을 직접 사용할 수 있습니다. map 기능. 이 때문입니다 dill 대신 사용됩니다 pickle 또는 cPickle, 그리고 dill 파이썬에서 거의 모든 것을 직렬화 할 수 있습니다.

pathos.multiprocessing 또한 비동기식 맵 함수를 제공합니다. map 여러 인수가있는 기능 (예 : map(math.pow, [1,2,3], [4,5,6]))

보다:다중 프로세싱과 딜은 무엇을 함께 할 수 있습니까?

그리고:http://matthewrocklin.com/blog/work/2013/12/05/parallelism-and-serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

그리고 명시 적으로, 당신은 당신이 처음에하고 싶었던 것을 정확히 원할 수 있으며, 원한다면 통역사로부터 그것을 할 수 있습니다.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

여기에서 코드를 받으십시오. https://github.com/uqfoundation/pathos

당신은 또한 a를 정의 할 수 있습니다 __call__() 당신의 내부의 방법 someClass(), 전화 someClass.go() 그리고 인스턴스를 전달합니다 someClass() 수영장에. 이 개체는 선택 가능하며 잘 작동합니다 (나를 위해) ...

Steven Bethard의 솔루션에 대한 몇 가지 제한 사항 :

클래스 메소드를 함수로 등록하면 메소드 처리가 완료 될 때마다 클래스의 소멸자가 놀랍게 호출됩니다. 따라서 수업의 1 인 인스턴스가 N Times its 메서드를 부르는 경우 회원은 2 번의 실행 사이에 사라질 수 있으며 메시지를받을 수 있습니다. malloc: *** error for object 0x...: pointer being freed was not allocated (예 : 오픈 멤버 파일) 또는 pure virtual method called, terminate called without an active exception (이것은 내가 사용한 멤버 객체의 수명보다 내가 생각한 것보다 짧았다는 것을 의미합니다). 수영장 크기보다 더 큰 N을 다룰 때 이것을 얻었습니다. 다음은 짧은 예입니다.

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

산출:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

그만큼 __call__ 없음, ...]가 결과에서 읽기 때문에 메소드는 그다지 동일하지 않습니다.

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

그래서 두 방법 중 어느 것도 만족하지 않습니다 ...

클래스 인스턴스의 내용에 따라 비효율적 일 수 있지만 사용할 수있는 또 다른 바로 컷이 있습니다.

모두가 말했듯이 문제는 multiprocessing 코드는 시작한 하위 프로세스로 보내는 것들을 피클해야하며 피클러는 인스턴스 메토드를 수행하지 않습니다.

그러나 인스턴스 메드를 보내는 대신 실제 클래스 인스턴스와 호출 할 함수 이름을 일반 함수로 보낼 수 있습니다. getattr 인스턴스 메드를 호출하려면 Pool 하위 프로세스. 이것은 정의와 유사합니다 __call__ 메소드는 둘 이상의 멤버 기능을 호출 할 수있는 것을 제외하고.

그의 대답에서 @erich.의 코드를 훔치고 그것을 조금 주석을 달았습니다 (나는 모든 이름이 변경되고 어떤 이유로 든 이것은 컷-앤 페이스트보다 더 쉬워 보였다 :-)) :

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

출력은 실제로 생성자가 원래 PID에서 한 번 (원래 PID) 호출되고 파괴자는 9 회 (각 사본마다 한 번만 제작 된마다 1 회 또는 3 회, 원본에서 한 번)를 보여줍니다. 프로세스). 기본 피클러가 전체 인스턴스의 사본을 만들고 (세미)를 비밀리에 다시 채색하기 때문에이 경우에는 다음과 같이 수행하기 때문에이 경우에는 종종 괜찮습니다.

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

- 이것이 세 명의 작업자 프로세스에서 파괴자가 8 번이라고 불려지더라도 매번 1 ~ 0으로 계산됩니다. 물론 여전히 이런 식으로 문제를 일으킬 수 있습니다. 필요한 경우 직접 제공 할 수 있습니다 __setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

이 경우 예를 들어.

당신은 또한 a를 정의 할 수 있습니다 __call__() 당신의 내부의 방법 someClass(), 전화 someClass.go() 그리고 인스턴스를 전달합니다 someClass() 수영장에. 이 개체는 선택 가능하며 잘 작동합니다 (나를 위해) ...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

솔루션 파리 존 위는 나와 함께 잘 작동합니다. 또한 코드는 깨끗하고 이해하기 쉽습니다. 필자의 경우 풀을 사용하여 호출 할 몇 가지 기능이 있으므로 Parisjohn의 코드를 약간 아래로 수정했습니다. 내가 만든 전화 여러 기능을 호출 할 수 있으려면 기능 이름이 인수 Dict에서 전달됩니다. go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

이에 대한 잠재적으로 사소한 솔루션은 사용으로 전환하는 것입니다. multiprocessing.dummy. 이것은 Python 2.7 에서이 문제가없는 것으로 보이는 멀티 프로세싱 인터페이스의 스레드 기반 구현입니다. 여기에는 경험이 많지 않지만이 빠른 가져 오기 변경으로 클래스 방법에서 apply_async 호출을 할 수있었습니다.

몇 가지 좋은 자원 multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

이 간단한 경우 어디에서 someClass.f 클래스의 데이터를 상속하지 않고 클래스에 아무것도 첨부하지 않으면 가능한 해결책은 분리하는 것입니다. f, 절인 할 수 있습니다.

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

별도의 func를 사용하지 않는 이유는 무엇입니까?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

업데이트 :이 글의 날 현재, 이름은 양도 가능합니다 (Python 2.7부터 시작).

여기서 문제는 자식 프로세스가 객체의 클래스를 가져올 수 없다는 것입니다.

빠른 해결 방법은 Globals ()에 영향을 미쳐 가져올 수 있도록하는 것입니다.

globals()["P"] = P
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top