Python 다중 처리에서 Pool.map을 배열(공유 메모리)과 결합하는 방법은 무엇입니까?

StackOverflow https://stackoverflow.com/questions/1675766

문제

여러 프로세스에서 병렬로 처리하려는 매우 큰(읽기 전용) 데이터 배열이 있습니다.

저는 Pool.map 함수를 좋아하며 이 함수를 사용하여 해당 데이터에 대한 함수를 병렬로 계산하고 싶습니다.

프로세스 간에 공유 메모리 데이터를 사용하기 위해 Value 또는 Array 클래스를 사용할 수 있다는 것을 알았습니다.하지만 이것을 사용하려고 하면 RuntimeError가 발생합니다.'SynchronizedString 객체는 Pool.map 함수를 사용할 때 상속을 통해 프로세스 간에만 공유되어야 합니다.

다음은 내가 하려는 작업에 대한 간단한 예입니다.

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

내가 여기서 뭘 잘못하고 있는지 말해 줄 수 있는 사람이 있나요?

그래서 내가 하고 싶은 것은 새로 생성된 공유 메모리 할당 배열에 대한 정보를 프로세스 풀에서 생성된 후 프로세스에 전달하는 것입니다.

도움이 되었습니까?

해결책

바운티를 방금 보았을 때 다시 시도;)

기본적으로 오류 메시지는 말한 것을 의미한다고 생각합니다. 공유 메모리 배열을 멀티 프로세싱하는 것은 인수로 전달할 수 없습니다 (산세). 데이터를 시리얼링하는 것은 의미가 없습니다. 요점은 데이터가 공유 메모리입니다. 따라서 공유 배열을 글로벌로 만들어야합니다. 첫 번째 답변에서와 같이 모듈의 속성으로 넣는 것이 깔끔한 것 같지만 예제에서 글로벌 변수로 남겨 두는 것도 잘 작동합니다. 포크 전에 데이터를 설정하고 싶지 않다는 점을 맡고 있습니다. 여기에 수정 된 예가 있습니다. 둘 이상의 가능한 공유 배열을 원한다면 (그리고 Toshare를 인수로 통과시키고 싶었을 경우) 유사하게 공유 배열의 글로벌 목록을 만들고 인덱스를 count_it에 전달할 수 있습니다 ( for c in toShare[i]:).

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

편집 : 포크를 사용하지 않기 때문에 위의 내용은 Windows에서 작동하지 않습니다. 그러나 아래는 Windows에서 작동하며 여전히 풀을 사용하므로 이것이 원하는 것과 가장 가깝다고 생각합니다.

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

왜 맵이 배열을 피울 수 없지만 프로세스와 풀이 될지 확실하지 않습니다. 아마도 Windows의 하위 프로세스 초기화 시점에서 전송되었다고 생각합니다. 그래도 데이터는 포크 이후에도 여전히 설정되어 있습니다.

다른 팁

내가 본 문제는 Pool이 인수 목록을 통해 공유 데이터 피클링을 지원하지 않는다는 것입니다.이것이 바로 "객체는 상속을 통해 프로세스 간에만 공유되어야 합니다"라는 오류 메시지의 의미입니다.공유 데이터는 상속되어야 합니다. 즉, Pool 클래스를 사용하여 공유하려면 전역이어야 합니다.

명시적으로 전달해야 하는 경우 multiprocessing.Process를 사용해야 할 수도 있습니다.재작업된 예는 다음과 같습니다.

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

산출:( 's', 9) ( 'a', 2) ( 'b', 3) ( 'd', 12)

대기열 요소의 순서는 다양할 수 있습니다.

이를 보다 일반적이고 풀과 유사하게 만들려면 고정된 N 수의 프로세스를 만들고 키 목록을 N 조각으로 분할한 다음 래퍼 함수를 ​​프로세스 대상으로 사용하여 목록의 각 키에 대해 count_it를 호출할 수 있습니다. 다음과 같이 전달됩니다.

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)

데이터 만 읽기 만하면 모듈의 변수로 만듭니다. ~ 전에 수영장에서 포크. 그러면 모든 자식 프로세스에 액세스 할 수 있어야하며 글을 쓰지 않는 경우 복사되지 않습니다.

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

배열을 사용하려고하면 lock=False 키워드 인수 (기본적으로 사실입니다).

그만큼 multiprocessing.sharedctypes 모듈은 아동 프로세스에 의해 상속 될 수있는 공유 메모리에서 CTypes 객체를 할당하는 기능을 제공합니다.

그래서 당신의 사용 sharedctypes 잘못되었습니다. 당신은 원하십니까? 상속 부모 프로세스 의이 배열입니까, 아니면 명시 적으로 전달하는 것을 선호합니까? 전자의 경우 다른 답변이 제안한대로 글로벌 변수를 만들어야합니다. 그러나 사용할 필요는 없습니다 sharedctypes 명시 적으로 통과하려면 원본을 전달하십시오 testData.

BTW, 귀하의 사용 Pool.map() 잘못되었습니다. 내장과 동일한 인터페이스가 있습니다 map() 기능 (당신은 그것을 엉망으로 만들었습니까? starmap()?). 아래는 어레이를 명시 적으로 전달하는 예제입니다.

from multiprocessing import Pool

def count_it( (arr, key) ):
    count = 0
    for c in arr:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
    pool = Pool()
    print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]])
라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top