Question

I have a program I want to parallelize using MPI. I have not worked with MPI before.

The program calculates the behavior for a large numer of objects over time. The data of these objects is stored in arrays, e.g. double precision :: body_x(10000) for the x coordinate.

To calculate the behavior of an object the information about all other objects is needed, so every thread needs to hold all data but will only update a portion of it. But before the new timestep every thread needs to get the information from all other threads.

As I understand MPI_Allgather could be used for this, but it needs a send buffer and a recive buffer. How can I synchronize an array over different threads if each thread updated a different part of the array? Do I have to send the whole array from each thread to the master in a recive buffer, update the specific part of the masters array and after all threads have sent their data re-broadcast from the master?

This is a pretty basic question, but I'm very new to MPI and all examples I found are pretty simple and do not cover this. Thanks for any help.

Pseudo-Example (assuming Fortran-Style vectors with first index 1): (Yes the send/recive would better be done non-blocking, this is for the sake of simplicity)

if (master) then
   readInputFile
end if

MPI_Bcast(numberOfObject)

allocate body_arrays(numberOfObjects)

if (master) then
   fill body_arrays ! with the data from the input file
end if

MPI_Bcast(body_arrays)

objectsPerThread = numberOfObjects / threadCount

myStart = threadID * objectsPerThread + 1
myEnd   = (threadID + 1) * objectsPerThread

do while (t < t_end)

   do i = myStart, myEnd
      do stuff for body_arrays(i)
   end do

   ! here is the question
   if (.not. master)
      MPI_Send(body_arrays, toMaster)
   else
      do i = 1, threadCount - 1
         MPI_Recive(body_arrays_recive, senderID)
         body_arrays(senderID*objectsPerThread+1, (senderId+1)*objectsPerThread) = body_arrays_recive(senderID*objectsPerThread+1, (senderId+1)*objectsPerThread)
   end if

   MPI_Bcast(body_arrays)
   ! ----

   t = t + dt
end do
Was it helpful?

Solution

It sounds like you want MPI_Allgather. To avoid needing a separate send buffer, you may be able to use the MPI_IN_PLACE value. That tells MPI to use the same buffer for both send and receive.

See http://mpi-forum.org/docs/mpi-2.2/mpi22-report/node99.htm#Node99

OTHER TIPS

The array chunks from all processes can be combined using a call to MPI_Allgatherv. The following is a complete example in Fortran. It defines an array of size 50. Then each process sets a chunk of that array to some complex number. Finally, the call to MPI_allgatherv pulls all the chunks together. The calculations of the chunk sizes, and some of the parameters that need to be passed to MPI_allgatherv are encapsulated in the mpi_split routine.

program test
  use mpi

  implicit none

  integer, parameter :: idp = 8
  integer, parameter :: n_tasks = 11
  real(idp), parameter :: zero = 0.0d0
  complex(idp), parameter :: czero = cmplx(zero, zero, kind=idp)

  integer :: mpi_n_procs, mpi_proc_id, error
  integer :: i, i_from, i_to
  complex(idp) :: c(-5:5)
  real(idp) :: split_size

  integer, allocatable :: recvcount(:), displs(:)

  call MPI_Init(error)
  call MPI_Comm_size(MPI_COMM_WORLD, mpi_n_procs, error)
  call MPI_Comm_rank(MPI_COMM_WORLD, mpi_proc_id, error)

  allocate(recvcount(mpi_n_procs))
  allocate(displs(mpi_n_procs))

  i_from = -5
  i_to = 5

  ! each process covers only part of the array
  call mpi_split(i_from, i_to, counts=recvcount, displs=displs)
  write(*,*) "ID", mpi_proc_id,":", i_from, "..", i_to
  if (mpi_proc_id == 0) then
    write(*,*) "Counts: ", recvcount
    write(*,*) "Displs: ", displs
  end if

  c(:) = czero
  do i = i_from, i_to
    c(i) = cmplx(real(i, idp), real(i+1, idp), kind=idp)
  end do

  call MPI_Allgatherv(c(i_from), i_to-i_from+1, MPI_DOUBLE_COMPLEX, c,         &
  &                   recvcount, displs, MPI_DOUBLE_COMPLEX, MPI_COMM_WORLD,   &
  &                   error)

  if (mpi_proc_id == 0) then
    do i = -5, 5
      write(*,*) i, ":", c(i)
    end do
  end if

  deallocate(recvcount, displs)
  call MPI_Finalize(error)

contains

  !! @description: split the range (a,b) into equal chunks, where each chunk is
  !! handled by a different MPI process
  !! @param: a        On input, the lower bound of an array to be processed. On
  !!                  output, the lower index of the chunk that the MPI process
  !!                  `proc_id` should process
  !! @param: b        On input, the upper bound of an array. On, output the
  !!                  upper index of the chunk that process `proc_id` should
  !!                  process.
  !! @param: n_procs  The total number of available processes. If not given,
  !!                  this is determined automatically from the MPI environment.
  !! @param: proc_id  The (zero-based) process ID (`0 <= proc_id < n_procs`). If
  !!                  not given, the ID of the current MPI process
  !! @param: counts   If given, must be of size `n_procs`. On output, the chunk
  !!                  size for each MPI process
  !! @param: displs   If given, must be of size `n_procs`. On output, the offset
  !!                  if the first index processed by each MPI process, relative
  !!                  to the input value of `a`
  subroutine mpi_split(a, b, n_procs, proc_id, counts, displs)

    integer,           intent(inout) :: a
    integer,           intent(inout) :: b
    integer, optional, intent(in)    :: n_procs
    integer, optional, intent(in)    :: proc_id
    integer, optional, intent(inout) :: counts(:)
    integer, optional, intent(inout) :: displs(:)

    integer :: mpi_n_procs, n_tasks, mpi_proc_id, error
    integer :: aa, bb
    real(idp) :: split_size
    logical :: mpi_is_initialized

    mpi_n_procs = 1
    if (present(n_procs)) mpi_n_procs = n_procs
    mpi_proc_id = 0
    if (present(proc_id)) mpi_proc_id = proc_id
    if (.not. present(n_procs)) then
      call MPI_Comm_size(MPI_COMM_WORLD, mpi_n_procs, error)
    end if
    if (.not. present(proc_id)) then
      call MPI_Comm_rank(MPI_COMM_WORLD, mpi_proc_id, error)
    end if

    aa = a
    bb = b

    n_tasks = bb - aa + 1
    split_size = real(n_tasks, idp) / real(max(mpi_n_procs, 1), idp)
    a = nint(mpi_proc_id * split_size) + aa
    b = min(aa + nint((mpi_proc_id+1) * split_size) - 1, bb)

    if (present(counts)) then
      do mpi_proc_id = 0, mpi_n_procs-1
        counts(mpi_proc_id+1) = max(nint((mpi_proc_id+1) * split_size)         &
        &                           - nint((mpi_proc_id) * split_size), 0)
      end do
    end if

    if (present(displs)) then
      do mpi_proc_id = 0, mpi_n_procs-1
        displs(mpi_proc_id+1) = min(nint(mpi_proc_id * split_size), bb-aa)
      end do
    end if

  end subroutine mpi_split

end program
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top