Im trying to do things concurrently in my program and to throttle the number of processes opened at the same time (10).

from multiprocessing import Process
from threading import BoundedSemaphore

semaphore = BoundedSemaphore(10)
for x in xrange(100000):
  semaphore.acquire(blocking=True)
  print 'new'
  p = Process(target=f, args=(x,))
  p.start()

def f(x):
  ...  # do some work
  semaphore.release()
  print 'done'

The first 10 processes are launched and they end correctly (I see 10 "new" and "done" on the console), and then nothing. I don't see another "new", the program just hangs there (and Ctrl-C doesn't work either). What's wrong ?

有帮助吗?

解决方案

Your problem is the use of threading.BoundedSemaphore across process boundaries:

import threading
import multiprocessing
import time

semaphore = threading.BoundedSemaphore(10)


def f(x):
  semaphore.release()
  print('done')


semaphore.acquire(blocking=True)
print('new')
print(semaphore._value)
p = multiprocessing.Process(target=f, args=(100,))
p.start()
time.sleep(3)
print(semaphore._value)

When you create a new process, the child gets a copy of the parent process's memory. Thus the child is decrementing it's semaphore, and the semaphore in the parent is untouched. (Typically, processes are isolated from each other: it takes some extra work to communicate across processes; this is what multiprocessing is for.)

This is opposed to threads, where the two threads share the memory space, and are considered the same process.

multiprocessing.BoundedSemaphore is probably what you want. (If you replace threading.BoundedSemaphore with it, and replace semaphore._value with semaphore.get_value()`, you'll see the above's output change.)

其他提示

Your bounded semaphore is not shared properly between the various processes which are being spawned; you might want to switch to using multiprocessing.BoundedSemaphore. See the answers to this question for some more details.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top