¿Cómo combinar Pool.map con Array (memoria compartida) en Python multiproceso?
-
16-09-2019 - |
Pregunta
Tengo un muy grande (sólo lectura) matriz de datos que quiero para ser procesado por múltiples procesos en paralelo.
Me gusta la función Pool.map y me gustaría utilizarlo para calcular funciones en esos datos en paralelo.
vi que uno puede utilizar el valor o la clase Array para utilizar los datos de memoria compartida entre los procesos. Pero cuando trato de utilizar este recibo una RuntimeError: 'objetos SynchronizedString sólo deben ser compartidos entre los procesos a través de la herencia cuando se utiliza la función Pool.map:
Este es un ejemplo simplificado de lo que yo estoy tratando de hacer:
from sys import stdin
from multiprocessing import Pool, Array
def count_it( arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
return count
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
# this works
print count_it( toShare, "a" )
pool = Pool()
# RuntimeError here
print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
Puede alguien decirme lo que estoy haciendo mal aquí?
Entonces, ¿qué me gustaría hacer es pasar información sobre una matriz de memoria compartida asignada recién creado para los procesos después de que se haya creado en la piscina proceso.
Solución
Tratando de nuevo como acabo de ver la generosidad;)
Básicamente creo que el mensaje de error significa lo que dijo - multiprocesamiento compartida matrices de memoria no se pueden pasar como argumentos (por decapado). No tiene sentido para serializar los datos - el punto es que los datos se comparte memoria. Por lo que tiene que hacer la matriz global compartida. Creo que es más limpio para ponerlo como el atributo de un módulo, como en mi primera respuesta, pero simplemente dejándolo como una variable global en su ejemplo también funciona bien. Tomando a bordo de su punto de no querer para establecer los datos antes de que el tenedor, aquí es un ejemplo modificado. Si quería tener más de una posible matriz compartida (y es por eso que quería pasar toshare como argumento) que podría similarmente hacer una lista global de matrices comunes, y sólo tiene que pasar el índice de count_it (que se convertiría en for c in toShare[i]:
).
from sys import stdin
from multiprocessing import Pool, Array, Process
def count_it( key ):
count = 0
for c in toShare:
if c == key:
count += 1
return count
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool()
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
[EDIT: Lo anterior no funciona en las ventanas, porque de no usar tenedor. Sin embargo, el siguiente no funciona en Windows, sigue utilizando la piscina, así que creo que esto es lo más cercano a lo que quiere:
from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule
def count_it( key ):
count = 0
for c in mymodule.toShare:
if c == key:
count += 1
return count
def initProcess(share):
mymodule.toShare = share
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool(initializer=initProcess,initargs=(toShare,))
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
No sé por qué mapa no se Conserve en vinagre la matriz, pero Proceso y piscina hará - Creo que tal vez se ha cedido en el punto de la inicialización de subproceso en las ventanas. Observe que los datos todavía se establece después de que el tenedor sin embargo.
Otros consejos
El problema que veo es que la piscina no es compatible con los datos de decapado compartida a través de la lista de parámetros. Eso es lo que significa el mensaje de error "objetos sólo deben ser compartidos entre los procesos a través de la herencia". Los datos compartidos necesita ser heredado, es decir, global si desea compartirlo con la clase piscina.
Si usted necesita para pasar de forma explícita, puede que tenga que utilizar multiprocessing.Process. Aquí está su ejemplo reelaborado:
from multiprocessing import Process, Array, Queue
def count_it( q, arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
q.put((key, count))
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
q = Queue()
keys = ['a', 'b', 's', 'd']
workers = [Process(target=count_it, args = (q, toShare, key))
for key in keys]
for p in workers:
p.start()
for p in workers:
p.join()
while not q.empty():
print q.get(),
Salida: ( 's', 9) ( 'a', 2) ( 'b', 3) ( 'D', 12)
El orden de los elementos de la cola puede variar.
Para que esto sea más genérico y similar a la piscina, se podría crear un número fijo N de Procesos, dividir la lista de claves en N trozos, y luego usar una función de contenedor como el objetivo del proceso, la cual llamará count_it para cada tecla en la lista que se pasa, como:
def wrapper( q, arr, keys ):
for k in keys:
count_it(q, arr, k)
Si los datos es de sólo lectura sólo lo hacen una variable en un módulo antes de el tenedor de la piscina. A continuación, todos los procesos hijo deben ser capaces de acceder a él, y no serán copiados siempre y cuando no se escribe en él.
import myglobals # anything (empty .py file)
myglobals.data = []
def count_it( key ):
count = 0
for c in myglobals.data:
if c == key:
count += 1
return count
if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )
Si quiere tratar de usar una matriz aunque se puede tratar con el argumento de palabra clave lock=False
(es cierto por defecto).
Por lo que su uso de sharedctypes
está mal. ¿Quiere hereda esta matriz de proceso padre o prefiere pasarlo explícitamente? En el primer caso hay que crear una variable global como otras respuestas sugieren. Pero no es necesario utilizar sharedctypes
pasarla explícitamente, sólo tiene que pasar testData
originales.
Por cierto, el uso de Pool.map()
está mal. Tiene la misma interfaz que la función map()
orden interna (lo hizo mal estado con starmap()
?). Abajo está ejemplo de trabajo con, pasando array explícitamente:
from multiprocessing import Pool
def count_it( (arr, key) ):
count = 0
for c in arr:
if c == key:
count += 1
return count
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
pool = Pool()
print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])