Pergunta

I'm running a process with mpirun and 2 cores and it gets killed at the point when I'm mixing values between the two processes. Both processes use about 15% of the machines memory and even though the memory will increase when mixing, there should still be plenty of memory left. So I'm assuming that there is a limit on the amount of memory used for passing messages in between the processes. How do I find out what this limit is and how do I remove it?

The error message that I'm getting when mpirun dies is this:

File "Comm.pyx", line 864, in mpi4py.MPI.Comm.bcast (src/mpi4py.MPI.c:67787)
File "pickled.pxi", line 564, in mpi4py.MPI.PyMPI_bcast (src/mpi4py.MPI.c:31462)
File "pickled.pxi", line 93, in mpi4py.MPI._p_Pickle.alloc (src/mpi4py.MPI.c:26327)
SystemError: Negative size passed to PyBytes_FromStringAndSize

And this is the bit of the code that leads to the error:

sum_updates_j_k = numpy.zeros((self.col.J_total, self.K), dtype=numpy.float64))        
comm.Reduce(self.updates_j_k, sum_updates_j_k, op=MPI.SUM) 
sum_updates_j_k = comm.bcast(sum_updates_j_k, root=0) 

The code usually works, it only runs into problems with larger amounts of data, which makes the size of the matrix that I'm exchanging between processes increase

Foi útil?

Solução

The culprit is probably the following lines found in the code of PyMPI_bcast():

cdef int count = 0
...
if dosend: smsg = pickle.dump(obj, &buf, &count)  # <----- (1)
with nogil: CHKERR( MPI_Bcast(&count, 1, MPI_INT, # <----- (2)
                              root, comm) )
cdef object rmsg = None
if dorecv and dosend: rmsg = smsg
elif dorecv: rmsg = pickle.alloc(&buf, count)
...

What happens here is that the object is first serialised at (1) using pickle.dump() and then the length of the pickled stream is broadcasted at (2).

There are two problems here and they both have to do with the fact that int is used for the length. The first problem is an integer cast inside pickle.dump and the other problem is that MPI_INT is used to transmit the length of the pickled stream. This limits the amount of data in your matrix to a certain size - namely the size that would result in a pickled object no bigger than 2 GiB (231-1 bytes). Any bigger object would result in an integer overflow and thus negative values in count.

This is clearly not an MPI issue but rather a bug in (or a feature of?) mpi4py.

Outras dicas

I had the same problem with mpi4py recently. As pointed out by Hristo Iliev in his answer, it's a pickle problem.

This can be avoided by using the upper-case methods comm.Reduce(), comm.Bcast(), etc., which do not resort to pickle, as opposed to lower-case methods like comm.reduce(). As a bonus, upper case methods should be a bit faster as well.

Actually, you're already using comm.Reduce(), so I expect that switching to comm.Bcast() should solve your problem - it did for me.

NB: The syntax of upper-case methods is slightly different, but this tutorial can help you get started.

For example, instead of:

sum_updates_j_k = comm.bcast(sum_updates_j_k, root=0) 

you would use:

comm.Bcast(sum_updates_j_k, root=0) 

For such a case it is useful to have a function that can send numpy arrays in parts, e.g.:

from mpi4py import MPI
import math, numpy
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
def bcast_array_obj(obj = None, dtype = numpy.float64, root = 0):
    """Function for broadcasting of a numpy array object"""
    reporter = 0 if root > 0 else 1
    if rank == root:
        for exp in range(11):
            parts = pow(2, exp)
            err = False
            part_len = math.ceil(len(obj) / parts)
            for part in range(parts):
                part_begin = part * part_len
                part_end = min((part + 1) * part_len, len(obj))
                try:
                    comm.bcast(obj[part_begin: part_end], root = root)
                except:
                    err = True
                err *= comm.recv(source = reporter, tag = 2)
                if err:
                    break
            if err:
                continue
            comm.bcast(None, root = root)
            print('The array was successfully sent in {} part{}'.\
                  format(parts, 's' if parts > 1 else ''))
            return
        sys.stderr.write('Failed to send the array even in 1024 parts')
        sys.stderr.flush()
    else:
        obj = numpy.zeros(0, dtype = dtype)
        while True:
            err = False
            try:
                part_obj = comm.bcast(root = root)
            except:
                err = True
                obj = numpy.zeros(0, dtype = dtype)
            if rank == reporter:
                comm.send(err, dest = root, tag = 2)
            if err:
                continue
            if type(part_obj) != type(None):
                frags = len(obj)
                obj.resize(frags + len(part_obj))
                obj[frags: ] = part_obj
            else:
                break
        return obj

This function automatically determines optimal number of parts to break the input array.

For example,

if rank != 0:
    z = bcast_array_obj(root = 0)
else:
    z = numpy.zeros(1000000000, dtype = numpy.float64)
    bcast_array_obj(z, root = 0)

outputs

The array was successfully sent in 4 parts
Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top