Question

I am trying to implement a multiprocessing application which can access a shared data resource. I am using locking mechanism to make sure the shared resource is accessed safely. However I am hitting error . Surprisingly if process 1 acquires lock first it is servicing the request and it is failing on next process which is trying to acquire lock.But if some other process other than 1 is trying to acquire lock first it is failing in very first run . I am new to python and using documentation to implement this So I am unaware if I am missing any basic safety mechanisms here.Any data point as why I am witnessing this would be of great help

PROGRAM:

#!/usr/bin/python
from multiprocessing import Process, Manager, Lock
import os
import Queue
import time
lock = Lock()
def launch_worker(d,l,index):
    global lock
    lock.acquire()
    d[index] = "new"
    print "in process"+str(index)
    print d
    lock.release()
    return None

def dispatcher():
    i=1
    d={}
    mp = Manager()
    d = mp.dict()
    d[1] = "a"
    d[2] = "b"
    d[3] = "c"
    d[4] = "d"
    d[5] = "e"
    l = mp.list(range(10))
    for i in range(4):
        p = Process(target=launch_worker, args=(d,l,i))
        i = i+1
        p.start()
    return None

if __name__ == '__main__':
    dispatcher()

ERROR when process 1 is serviced first

in process0
{0: 'new', 1: 'a', 2: 'b', 3: 'c', 4: 'd', 5: 'e'}
Process Process-3:
Traceback (most recent call last):
  File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
    self.run()
  File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
    self._target(*self._args, **self._kwargs)
  File "dispatcher.py", line 10, in launch_worker
    d[index] = "new"
  File "<string>", line 2, in __setitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
    c = SocketClient(address)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
    s.connect(address)
  File "<string>", line 1, in connect
error: [Errno 2] No such file or directory

ERROR when process 2 is serviced first

Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap
    self.run()
  File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
    self._target(*self._args, **self._kwargs)
  File "dispatcher.py", line 10, in launch_worker
    d[index] = "new"
  File "<string>", line 2, in __setitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 150, in Client
    deliver_challenge(c, authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 373, in deliver_challenge
    response = connection.recv_bytes(256)        # reject large message
IOError: [Errno 104] Connection reset by peer
Was it helpful?

Solution

The dict your workers modify is a shared object managed by the dispatching process; modifications to that object by the workers requires that they communicate with the dispatching process. The errors you see come from the fact that your dispatcher isn't waiting for the worker processes after it launches them; it's exiting too soon, so it might not exist for them to communicate with when they need to.

The first worker or two that attempts to update the shared dict might succeed, because when they modify the shared dict the process containing the Manager instance might still exist (e.g., it might still be in the process of creating further workers). Thus in your examples you see some successful output. But the managing process soon exits, and the next worker that attempts a modification will fail. (The error messages you see are typical of failed attempts at inter-process communication; you'll probably also see EOF errors if you run your program a few more times.)

What you need to do is call the join method on the Process objects as a way of waiting for each of them to exit. The following modification of your dispatcher shows the basic idea:

def dispatcher():
    mp = Manager()
    d = mp.dict()
    d[1] = "a"
    d[2] = "b"
    d[3] = "c"
    d[4] = "d"
    d[5] = "e"
    procs = []
    for i in range(4):
        p = Process(target=launch_worker, args=(d,i))
        procs.append(p)
        p.start()
    for p in procs:
        p.join()
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top