Pregunta

Tengo un muy grande (sólo lectura) matriz de datos que quiero para ser procesado por múltiples procesos en paralelo.

Me gusta la función Pool.map y me gustaría utilizarlo para calcular funciones en esos datos en paralelo.

vi que uno puede utilizar el valor o la clase Array para utilizar los datos de memoria compartida entre los procesos. Pero cuando trato de utilizar este recibo una RuntimeError: 'objetos SynchronizedString sólo deben ser compartidos entre los procesos a través de la herencia cuando se utiliza la función Pool.map:

Este es un ejemplo simplificado de lo que yo estoy tratando de hacer:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

Puede alguien decirme lo que estoy haciendo mal aquí?

Entonces, ¿qué me gustaría hacer es pasar información sobre una matriz de memoria compartida asignada recién creado para los procesos después de que se haya creado en la piscina proceso.

¿Fue útil?

Solución

Tratando de nuevo como acabo de ver la generosidad;)

Básicamente creo que el mensaje de error significa lo que dijo - multiprocesamiento compartida matrices de memoria no se pueden pasar como argumentos (por decapado). No tiene sentido para serializar los datos - el punto es que los datos se comparte memoria. Por lo que tiene que hacer la matriz global compartida. Creo que es más limpio para ponerlo como el atributo de un módulo, como en mi primera respuesta, pero simplemente dejándolo como una variable global en su ejemplo también funciona bien. Tomando a bordo de su punto de no querer para establecer los datos antes de que el tenedor, aquí es un ejemplo modificado. Si quería tener más de una posible matriz compartida (y es por eso que quería pasar toshare como argumento) que podría similarmente hacer una lista global de matrices comunes, y sólo tiene que pasar el índice de count_it (que se convertiría en for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[EDIT: Lo anterior no funciona en las ventanas, porque de no usar tenedor. Sin embargo, el siguiente no funciona en Windows, sigue utilizando la piscina, así que creo que esto es lo más cercano a lo que quiere:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

No sé por qué mapa no se Conserve en vinagre la matriz, pero Proceso y piscina hará - Creo que tal vez se ha cedido en el punto de la inicialización de subproceso en las ventanas. Observe que los datos todavía se establece después de que el tenedor sin embargo.

Otros consejos

El problema que veo es que la piscina no es compatible con los datos de decapado compartida a través de la lista de parámetros. Eso es lo que significa el mensaje de error "objetos sólo deben ser compartidos entre los procesos a través de la herencia". Los datos compartidos necesita ser heredado, es decir, global si desea compartirlo con la clase piscina.

Si usted necesita para pasar de forma explícita, puede que tenga que utilizar multiprocessing.Process. Aquí está su ejemplo reelaborado:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),
  

Salida: ( 's', 9) ( 'a', 2) ( 'b', 3)   ( 'D', 12)

El orden de los elementos de la cola puede variar.

Para que esto sea más genérico y similar a la piscina, se podría crear un número fijo N de Procesos, dividir la lista de claves en N trozos, y luego usar una función de contenedor como el objetivo del proceso, la cual llamará count_it para cada tecla en la lista que se pasa, como:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)

Si los datos es de sólo lectura sólo lo hacen una variable en un módulo antes de el tenedor de la piscina. A continuación, todos los procesos hijo deben ser capaces de acceder a él, y no serán copiados siempre y cuando no se escribe en él.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

Si quiere tratar de usar una matriz aunque se puede tratar con el argumento de palabra clave lock=False (es cierto por defecto).

  

El módulo multiprocessing.sharedctypes proporciona funciones para la asignación de ctypes objetos de la memoria compartida que puede ser heredada por procesos secundarios.

Por lo que su uso de sharedctypes está mal. ¿Quiere hereda esta matriz de proceso padre o prefiere pasarlo explícitamente? En el primer caso hay que crear una variable global como otras respuestas sugieren. Pero no es necesario utilizar sharedctypes pasarla explícitamente, sólo tiene que pasar testData originales.

Por cierto, el uso de Pool.map() está mal. Tiene la misma interfaz que la función map() orden interna (lo hizo mal estado con starmap()?). Abajo está ejemplo de trabajo con, pasando array explícitamente:

from multiprocessing import Pool

def count_it( (arr, key) ):
    count = 0
    for c in arr:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
    pool = Pool()
    print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])
Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top