-
08-07-2019 - |
문제
사용하려고합니다 multiprocessing
'에스 Pool.map()
동시에 작업을 나누는 기능. 다음 코드를 사용하면 잘 작동합니다.
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
그러나보다 객체 지향적 인 접근 방식으로 사용하면 작동하지 않습니다. 그것이주는 오류 메시지는 다음과 같습니다.
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
이것은 다음이 나의 주요 프로그램 인 경우에 발생합니다.
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
그리고 다음은 나의 것입니다 someClass
수업:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
누구든지 문제가 무엇인지, 또는 그 주위에 쉬운 방법을 알고 있습니까?
다른 팁
표준 라이브러리 밖으로 뛰어 내리지 않으면 멀티 프로세싱 및 산세가 깨지고 제한되어 있기 때문에 이러한 모든 솔루션은 추악합니다.
포크를 사용하는 경우 multiprocessing
~라고 불리는 pathos.multiprocesssing
, 멀티 프로세싱에서 클래스와 클래스 방법을 직접 사용할 수 있습니다. map
기능. 이 때문입니다 dill
대신 사용됩니다 pickle
또는 cPickle
, 그리고 dill
파이썬에서 거의 모든 것을 직렬화 할 수 있습니다.
pathos.multiprocessing
또한 비동기식 맵 함수를 제공합니다. map
여러 인수가있는 기능 (예 : map(math.pow, [1,2,3], [4,5,6])
)
보다:다중 프로세싱과 딜은 무엇을 함께 할 수 있습니까?
그리고:http://matthewrocklin.com/blog/work/2013/12/05/parallelism-and-serialization/
>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
그리고 명시 적으로, 당신은 당신이 처음에하고 싶었던 것을 정확히 원할 수 있으며, 원한다면 통역사로부터 그것을 할 수 있습니다.
>>> import pathos.pools as pp
>>> class someClass(object):
... def __init__(self):
... pass
... def f(self, x):
... return x*x
... def go(self):
... pool = pp.ProcessPool(4)
... print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
여기에서 코드를 받으십시오. https://github.com/uqfoundation/pathos
당신은 또한 a를 정의 할 수 있습니다 __call__()
당신의 내부의 방법 someClass()
, 전화 someClass.go()
그리고 인스턴스를 전달합니다 someClass()
수영장에. 이 개체는 선택 가능하며 잘 작동합니다 (나를 위해) ...
Steven Bethard의 솔루션에 대한 몇 가지 제한 사항 :
클래스 메소드를 함수로 등록하면 메소드 처리가 완료 될 때마다 클래스의 소멸자가 놀랍게 호출됩니다. 따라서 수업의 1 인 인스턴스가 N Times its 메서드를 부르는 경우 회원은 2 번의 실행 사이에 사라질 수 있으며 메시지를받을 수 있습니다. malloc: *** error for object 0x...: pointer being freed was not allocated
(예 : 오픈 멤버 파일) 또는 pure virtual method called,
terminate called without an active exception
(이것은 내가 사용한 멤버 객체의 수명보다 내가 생각한 것보다 짧았다는 것을 의미합니다). 수영장 크기보다 더 큰 N을 다룰 때 이것을 얻었습니다. 다음은 짧은 예입니다.
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multi-processing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __del__(self):
print "... Destructor"
def process_obj(self, index):
print "object %d" % index
return "results"
pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)
산출:
Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor
그만큼 __call__
없음, ...]가 결과에서 읽기 때문에 메소드는 그다지 동일하지 않습니다.
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multiprocessing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __call__(self, i):
self.process_obj(i)
def __del__(self):
print "... Destructor"
def process_obj(self, i):
print "obj %d" % i
return "result"
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once),
# **and** results are empty !
그래서 두 방법 중 어느 것도 만족하지 않습니다 ...
클래스 인스턴스의 내용에 따라 비효율적 일 수 있지만 사용할 수있는 또 다른 바로 컷이 있습니다.
모두가 말했듯이 문제는 multiprocessing
코드는 시작한 하위 프로세스로 보내는 것들을 피클해야하며 피클러는 인스턴스 메토드를 수행하지 않습니다.
그러나 인스턴스 메드를 보내는 대신 실제 클래스 인스턴스와 호출 할 함수 이름을 일반 함수로 보낼 수 있습니다. getattr
인스턴스 메드를 호출하려면 Pool
하위 프로세스. 이것은 정의와 유사합니다 __call__
메소드는 둘 이상의 멤버 기능을 호출 할 수있는 것을 제외하고.
그의 대답에서 @erich.의 코드를 훔치고 그것을 조금 주석을 달았습니다 (나는 모든 이름이 변경되고 어떤 이유로 든 이것은 컷-앤 페이스트보다 더 쉬워 보였다 :-)) :
import multiprocessing
import os
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class Klass(object):
def __init__(self, nobj, workers=multiprocessing.cpu_count()):
print "Constructor (in pid=%d)..." % os.getpid()
self.count = 1
pool = multiprocessing.Pool(processes = workers)
async_results = [pool.apply_async(call_it,
args = (self, 'process_obj', (i,))) for i in range(nobj)]
pool.close()
map(multiprocessing.pool.ApplyResult.wait, async_results)
lst_results = [r.get() for r in async_results]
print lst_results
def __del__(self):
self.count -= 1
print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
def process_obj(self, index):
print "object %d" % index
return "results"
Klass(nobj=8, workers=3)
출력은 실제로 생성자가 원래 PID에서 한 번 (원래 PID) 호출되고 파괴자는 9 회 (각 사본마다 한 번만 제작 된마다 1 회 또는 3 회, 원본에서 한 번)를 보여줍니다. 프로세스). 기본 피클러가 전체 인스턴스의 사본을 만들고 (세미)를 비밀리에 다시 채색하기 때문에이 경우에는 다음과 같이 수행하기 때문에이 경우에는 종종 괜찮습니다.
obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
- 이것이 세 명의 작업자 프로세스에서 파괴자가 8 번이라고 불려지더라도 매번 1 ~ 0으로 계산됩니다. 물론 여전히 이런 식으로 문제를 일으킬 수 있습니다. 필요한 경우 직접 제공 할 수 있습니다 __setstate__
:
def __setstate__(self, adict):
self.count = adict['count']
이 경우 예를 들어.
당신은 또한 a를 정의 할 수 있습니다 __call__()
당신의 내부의 방법 someClass()
, 전화 someClass.go()
그리고 인스턴스를 전달합니다 someClass()
수영장에. 이 개체는 선택 가능하며 잘 작동합니다 (나를 위해) ...
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
p = Pool(4)
sc = p.map(self, range(4))
print sc
def __call__(self, x):
return self.f(x)
sc = someClass()
sc.go()
솔루션 파리 존 위는 나와 함께 잘 작동합니다. 또한 코드는 깨끗하고 이해하기 쉽습니다. 필자의 경우 풀을 사용하여 호출 할 몇 가지 기능이 있으므로 Parisjohn의 코드를 약간 아래로 수정했습니다. 내가 만든 전화 여러 기능을 호출 할 수 있으려면 기능 이름이 인수 Dict에서 전달됩니다. go()
:
from multiprocessing import Pool
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def g(self, x):
return x*x+1
def go(self):
p = Pool(4)
sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
print sc
def __call__(self, x):
if x["func"]=="f":
return self.f(x["v"])
if x["func"]=="g":
return self.g(x["v"])
sc = someClass()
sc.go()
이에 대한 잠재적으로 사소한 솔루션은 사용으로 전환하는 것입니다. multiprocessing.dummy
. 이것은 Python 2.7 에서이 문제가없는 것으로 보이는 멀티 프로세싱 인터페이스의 스레드 기반 구현입니다. 여기에는 경험이 많지 않지만이 빠른 가져 오기 변경으로 클래스 방법에서 apply_async 호출을 할 수있었습니다.
몇 가지 좋은 자원 multiprocessing.dummy
:
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy
이 간단한 경우 어디에서 someClass.f
클래스의 데이터를 상속하지 않고 클래스에 아무것도 첨부하지 않으면 가능한 해결책은 분리하는 것입니다. f
, 절인 할 수 있습니다.
import multiprocessing
def f(x):
return x*x
class someClass(object):
def __init__(self):
pass
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
별도의 func를 사용하지 않는 이유는 무엇입니까?
def func(*args, **kwargs):
return inst.method(args, kwargs)
print pool.map(func, arr)
업데이트 :이 글의 날 현재, 이름은 양도 가능합니다 (Python 2.7부터 시작).
여기서 문제는 자식 프로세스가 객체의 클래스를 가져올 수 없다는 것입니다.
빠른 해결 방법은 Globals ()에 영향을 미쳐 가져올 수 있도록하는 것입니다.
globals()["P"] = P