Как объединить Pool.map с массивом (разделяемой памятью) в многопроцессорной обработке Python?

StackOverflow https://stackoverflow.com/questions/1675766

Вопрос

У меня есть очень большой массив данных (только для чтения), которые я хочу обрабатывать несколькими процессами параллельно.

Мне нравится функция Pool.map, и я хотел бы использовать ее для параллельного вычисления функций по этим данным.

Я видел, что можно использовать класс Value или Array для использования данных общей памяти между процессами.Но когда я пытаюсь использовать это, я получаю RuntimeError:'Объекты SynchronizedString должны совместно использоваться процессами только посредством наследования при использовании функции Pool.map:

Вот упрощенный пример того, что я пытаюсь сделать:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

Кто-нибудь может сказать мне, что я здесь делаю не так?

Итак, что я хотел бы сделать, это передать информацию о вновь созданном массиве, выделенном для общей памяти, процессам после того, как они были созданы в пуле процессов.

Это было полезно?

Решение

Пробую еще раз, только что увидел награду ;)

По сути, я думаю, что сообщение об ошибке означает то, что оно говорит: многопроцессорные массивы общей памяти не могут передаваться в качестве аргументов (путем травления).Нет смысла сериализовать данные — дело в том, что данные находятся в общей памяти.Поэтому вам нужно сделать общий массив глобальным.Я думаю, что лучше поместить его как атрибут модуля, как в моем первом ответе, но просто оставить его как глобальную переменную в вашем примере также работает хорошо.Принимая во внимание ваше мнение о нежелании устанавливать данные до разветвления, вот модифицированный пример.Если вы хотите иметь более одного возможного общего массива (и именно поэтому вы хотите передать toShare в качестве аргумента), вы можете аналогичным образом создать глобальный список общих массивов и просто передать индекс в count_it (который станет for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[РЕДАКТИРОВАТЬ:Вышеупомянутое не работает в Windows из-за отсутствия использования fork.Тем не менее, приведенное ниже работает в Windows, все еще используя пул, поэтому я думаю, что это наиболее близко к тому, что вы хотите:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

Не уверен, почему карта не будет собирать массив, а процесс и пул - я думаю, возможно, он был передан в момент инициализации подпроцесса в Windows.Обратите внимание, что данные все еще устанавливаются после разветвления.

Другие советы

Проблема, которую я вижу, заключается в том, что Pool не поддерживает обработку общих данных через свой список аргументов.Вот что означает сообщение об ошибке под "объекты должны совместно использоваться только процессами через наследование".Общие данные должны быть унаследованы, т. е. глобальными, если вы хотите предоставить им общий доступ с помощью класса пула.

Если вам нужно передать их явно, возможно, вам придется использовать многопроцессорную обработку.Обрабатывать.Вот ваш переработанный пример:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

Выходной сигнал:('s', 9) ('a', 2) ('b', 3) ('d', 12)

Порядок расположения элементов очереди может варьироваться.

Чтобы сделать это более общим и похожим на Pool, вы могли бы создать фиксированное N количество процессов, разделить список ключей на N частей, а затем использовать функцию-оболочку в качестве целевого процесса, который будет вызывать count_it для каждого ключа в передаваемом списке, например:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)

Если данные доступны только для чтения, просто сделайте их переменной в модуле. до вилка из Пула.Тогда все дочерние процессы смогут получить к нему доступ, и он не будет скопирован, если вы не напишете в него данные.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

Если вы хотите попробовать использовать Array, вы можете попробовать с помощью lock=False аргумент ключевого слова (по умолчанию это правда).

Тот Самый multiprocessing.sharedctypes модуль предоставляет функции для выделения объектов ctypes из общей памяти, которые могут быть унаследованы дочерними процессами.

Итак, ваше использование sharedctypes это неправильно.Вы хотите это сделать унаследовать этот массив из родительского процесса или вы предпочитаете передавать его явно?В первом случае вам нужно создать глобальную переменную, как предполагают другие ответы.Но вам не нужно использовать sharedctypes чтобы передать это явно, просто передайте original testData.

Кстати, ваше использование Pool.map() это неправильно.Он имеет тот же интерфейс, что и встроенный map() функция (вы что-то напутали с ней starmap()?).Ниже приведен рабочий пример с явной передачей массива:

from multiprocessing import Pool

def count_it( (arr, key) ):
    count = 0
    for c in arr:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
    pool = Pool()
    print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top