How do I remove the memory limit on openmpi processes?
Pergunta
I'm running a process with mpirun and 2 cores and it gets killed at the point when I'm mixing values between the two processes. Both processes use about 15% of the machines memory and even though the memory will increase when mixing, there should still be plenty of memory left. So I'm assuming that there is a limit on the amount of memory used for passing messages in between the processes. How do I find out what this limit is and how do I remove it?
The error message that I'm getting when mpirun dies is this:
File "Comm.pyx", line 864, in mpi4py.MPI.Comm.bcast (src/mpi4py.MPI.c:67787)
File "pickled.pxi", line 564, in mpi4py.MPI.PyMPI_bcast (src/mpi4py.MPI.c:31462)
File "pickled.pxi", line 93, in mpi4py.MPI._p_Pickle.alloc (src/mpi4py.MPI.c:26327)
SystemError: Negative size passed to PyBytes_FromStringAndSize
And this is the bit of the code that leads to the error:
sum_updates_j_k = numpy.zeros((self.col.J_total, self.K), dtype=numpy.float64))
comm.Reduce(self.updates_j_k, sum_updates_j_k, op=MPI.SUM)
sum_updates_j_k = comm.bcast(sum_updates_j_k, root=0)
The code usually works, it only runs into problems with larger amounts of data, which makes the size of the matrix that I'm exchanging between processes increase
Solução
The culprit is probably the following lines found in the code of PyMPI_bcast()
:
cdef int count = 0
...
if dosend: smsg = pickle.dump(obj, &buf, &count) # <----- (1)
with nogil: CHKERR( MPI_Bcast(&count, 1, MPI_INT, # <----- (2)
root, comm) )
cdef object rmsg = None
if dorecv and dosend: rmsg = smsg
elif dorecv: rmsg = pickle.alloc(&buf, count)
...
What happens here is that the object is first serialised at (1)
using pickle.dump()
and then the length of the pickled stream is broadcasted at (2)
.
There are two problems here and they both have to do with the fact that int
is used for the length. The first problem is an integer cast inside pickle.dump
and the other problem is that MPI_INT
is used to transmit the length of the pickled stream. This limits the amount of data in your matrix to a certain size - namely the size that would result in a pickled object no bigger than 2 GiB (231-1 bytes). Any bigger object would result in an integer overflow and thus negative values in count
.
This is clearly not an MPI issue but rather a bug in (or a feature of?) mpi4py
.
Outras dicas
I had the same problem with mpi4py recently. As pointed out by Hristo Iliev in his answer, it's a pickle problem.
This can be avoided by using the upper-case methods comm.Reduce()
, comm.Bcast()
, etc., which do not resort to pickle, as opposed to lower-case methods like comm.reduce()
. As a bonus, upper case methods should be a bit faster as well.
Actually, you're already using comm.Reduce()
, so I expect that switching to comm.Bcast()
should solve your problem - it did for me.
NB: The syntax of upper-case methods is slightly different, but this tutorial can help you get started.
For example, instead of:
sum_updates_j_k = comm.bcast(sum_updates_j_k, root=0)
you would use:
comm.Bcast(sum_updates_j_k, root=0)
For such a case it is useful to have a function that can send numpy
arrays in parts, e.g.:
from mpi4py import MPI
import math, numpy
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
def bcast_array_obj(obj = None, dtype = numpy.float64, root = 0):
"""Function for broadcasting of a numpy array object"""
reporter = 0 if root > 0 else 1
if rank == root:
for exp in range(11):
parts = pow(2, exp)
err = False
part_len = math.ceil(len(obj) / parts)
for part in range(parts):
part_begin = part * part_len
part_end = min((part + 1) * part_len, len(obj))
try:
comm.bcast(obj[part_begin: part_end], root = root)
except:
err = True
err *= comm.recv(source = reporter, tag = 2)
if err:
break
if err:
continue
comm.bcast(None, root = root)
print('The array was successfully sent in {} part{}'.\
format(parts, 's' if parts > 1 else ''))
return
sys.stderr.write('Failed to send the array even in 1024 parts')
sys.stderr.flush()
else:
obj = numpy.zeros(0, dtype = dtype)
while True:
err = False
try:
part_obj = comm.bcast(root = root)
except:
err = True
obj = numpy.zeros(0, dtype = dtype)
if rank == reporter:
comm.send(err, dest = root, tag = 2)
if err:
continue
if type(part_obj) != type(None):
frags = len(obj)
obj.resize(frags + len(part_obj))
obj[frags: ] = part_obj
else:
break
return obj
This function automatically determines optimal number of parts to break the input array.
For example,
if rank != 0:
z = bcast_array_obj(root = 0)
else:
z = numpy.zeros(1000000000, dtype = numpy.float64)
bcast_array_obj(z, root = 0)
outputs
The array was successfully sent in 4 parts