できない漬け <type 'instancemethod'=""> 利用時マルチプロセプールがあります。地図()
-
08-07-2019 - |
質問
私は利用しよう multiprocessing
's Pool.map()
機能分け作業を同時にご利用する場合は、以下のコードを動作させることができァ
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
しかし、使用することによりオブジェクト指向アプローチするものではありません。エラーメッセージでは:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
この時は以下が私の主なプログラム:
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
およびそれだけではないですが、もう someClass
クラス:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
誰かの問題役割を果たすことができるのは簡単うのですか?
解決
問題は、マルチプロセッシングがそれらをプロセス間でスリングするためにピクルスする必要があり、バインドされたメソッドがピクル可能でないことです。回避策(「簡単」と見なすかどうかにかかわらず)は、インフラストラクチャをプログラムに追加して、そのようなメソッドをピクルできるようにし、 copy_reg 標準ライブラリメソッド。
たとえば、このスレッドへのSteven Bethardの貢献(スレッドの終わりに向かって)は、 copy_reg
を介したメソッドのpickle / unpicklingを可能にする1つの完全に実行可能なアプローチを示しています。
他のヒント
すべてのこれらのソリューションは、醜いので多重処理、酸洗では、限られない限り、ジャンプの外には標準図書館があります。
ご利用の場合、フォークの multiprocessing
という pathos.multiprocesssing
, は、直接利用できる授業やクラスメソッドでのマルチプロセ map
ます。これは、 dill
が使用されているの pickle
または cPickle
, は、 dill
でserializeほとんど何もエラーになります。
pathos.multiprocessing
また非同期マップ機能で... map
機能複数の引数(例えば map(math.pow, [1,2,3], [4,5,6])
)
参照:何ができるマルチプロセとディルな相互の関連性を検討した。
と:http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/
>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
とされる方もいるでしょう明示的にいっていただきたいと思ったのは最初に行うことができるので通訳者からばっているのです。
>>> import pathos.pools as pp
>>> class someClass(object):
... def __init__(self):
... pass
... def f(self, x):
... return x*x
... def go(self):
... pool = pp.ProcessPool(4)
... print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
コードを取得します。 https://github.com/uqfoundation/pathos
someClass.go()
を呼び出す someClass()
内で __ call __()
メソッドを定義してから、プールへの someClass()
のインスタンス。このオブジェクトはピクル可能であり、(私にとっては)正常に動作します...
Steven Bethardのソリューションに対するいくつかの制限:
クラスメソッドを関数として登録すると、メソッド処理が終了するたびにクラスのデストラクタが驚くほど呼び出されます。そのため、メソッドをn回呼び出すクラスのインスタンスが1つある場合、2回の実行の間にメンバーが消え、メッセージ malloc:*** error for object 0x ...:freeing being free was notlocated
(たとえば、メンバーファイルを開く)または pure virtual method called
アクティブな例外なしで呼び出された終了
(これは、使用したメンバーオブジェクトの寿命が思ったよりも短いことを意味します)。プールサイズよりも大きいnを扱うときにこれを取得しました。以下に短い例を示します:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multi-processing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __del__(self):
print "... Destructor"
def process_obj(self, index):
print "object %d" % index
return "results"
pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)
出力:
Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor
__ call __
メソッドは、[None、...]が結果から読み取られるため、それほど等価ではありません:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multiprocessing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __call__(self, i):
self.process_obj(i)
def __del__(self):
print "... Destructor"
def process_obj(self, i):
print "obj %d" % i
return "result"
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once),
# **and** results are empty !
つまり、どちらの方法も満足のいくものではありません...
使用できるショートカットはもう1つありますが、クラスインスタンスの内容によっては効率が悪い場合があります。
誰もが言ったように、問題は multiprocessing
コードが開始したサブプロセスに送信するものをピクルスする必要があり、ピッカーはインスタンスメソッドを実行しないことです。
ただし、instance-methodを送信する代わりに、実際のクラスインスタンスと、呼び出す関数の名前を通常の関数に送信し、次に getattr
を使用してインスタンスを呼び出します。したがって、 Pool
サブプロセスでバインドされたメソッドを作成します。これは __ call __
メソッドの定義と似ていますが、複数のメンバー関数を呼び出すことができる点が異なります。
彼の答えから@EricH。のコードを盗み、少し注釈を付けました(すべての名前が変更されたため、何らかの理由で、カットアンドペーストより簡単に見えました:-))魔法:
import multiprocessing
import os
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class Klass(object):
def __init__(self, nobj, workers=multiprocessing.cpu_count()):
print "Constructor (in pid=%d)..." % os.getpid()
self.count = 1
pool = multiprocessing.Pool(processes = workers)
async_results = [pool.apply_async(call_it,
args = (self, 'process_obj', (i,))) for i in range(nobj)]
pool.close()
map(multiprocessing.pool.ApplyResult.wait, async_results)
lst_results = [r.get() for r in async_results]
print lst_results
def __del__(self):
self.count -= 1
print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
def process_obj(self, index):
print "object %d" % index
return "results"
Klass(nobj=8, workers=3)
出力は、実際には、コンストラクターが(元のpidで)1回呼び出され、デストラクタが9回呼び出されることを示しています(作成されたコピーごとに1回=必要に応じてpool-worker-processごとに2または3回、さらに1回元のプロセスで)。デフォルトのピッカーはインスタンス全体のコピーを作成し、(半)密かにそれを再投入するため、この場合のように、これは多くの場合問題ありません&#8212;この場合、次のようにします:
obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
&#8212;そのため、3つのワーカープロセスでデストラクタが8回呼び出されても、毎回1から0にカウントダウンしますが、もちろんこの方法で問題が発生する可能性があります。必要に応じて、独自の __ setstate __
を提供できます:
def __setstate__(self, adict):
self.count = adict['count']
たとえばこの場合。
someClass.go()
を呼び出す someClass()
内で __ call __()
メソッドを定義してから、プールへの someClass()
のインスタンス。このオブジェクトはピクル可能であり、(私にとっては)正常に動作します...
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
p = Pool(4)
sc = p.map(self, range(4))
print sc
def __call__(self, x):
return self.f(x)
sc = someClass()
sc.go()
上記の parisjohn からのソリューションは、私と一緒にうまく機能します。さらに、コードは簡潔で理解しやすいように見えます。私の場合、Poolを使用して呼び出す関数がいくつかあるため、パリジョンのコードを少し下に変更しました。複数の関数を呼び出すことができるように call を作成し、関数名は go()
から引数dictで渡されます:
from multiprocessing import Pool
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def g(self, x):
return x*x+1
def go(self):
p = Pool(4)
sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
print sc
def __call__(self, x):
if x["func"]=="f":
return self.f(x["v"])
if x["func"]=="g":
return self.g(x["v"])
sc = someClass()
sc.go()
これに対する潜在的に些細な解決策は、 multiprocessing.dummy
の使用に切り替えることです。これは、マルチプロセッシングインターフェイスのスレッドベースの実装であり、Python 2.7ではこの問題は発生していないようです。ここではあまり経験がありませんが、この簡単なインポートの変更により、クラスメソッドでapply_asyncを呼び出すことができました。
multiprocessing.dummy
の優れたリソース:
https://docs.python.org/2 /library/multiprocessing.html#module-multiprocessing.dummy
someClass.f
がクラスからデータを継承せず、クラスに何もアタッチしないこの単純なケースでは、可能な解決策は f
、それでピクルスにすることができます:
import multiprocessing
def f(x):
return x*x
class someClass(object):
def __init__(self):
pass
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
別のfuncを使用しない理由
def func(*args, **kwargs):
return inst.method(args, kwargs)
print pool.map(func, arr)
更新:この記事の執筆時点で、namedTuplesは選択可能です(python 2.7以降)
ここでの問題は、子プロセスがオブジェクトのクラス(この場合はクラスP)をインポートできないことです。マルチモデルプロジェクトの場合、クラスPは子プロセスのどこでもインポート可能でなければなりません慣れる
簡単な回避策は、globals()に影響を与えてインポート可能にすることです
globals()["P"] = P