Pergunta

Recently, I started working on a cluster to speed up my work. Currently my job is to profile a code with input data of different sizes. Previously I was doing that in a for loop for every input file. On the cluster I use MPI to run the program with each different input as a different process.

The source code of the MPI script can be found below. It runs the different processes and there is also a "server" process that writes out the results, to avoid a situation where two processes would write out simultaneously to the output file. As the code is structured now, I can only access the output if all processes end on time. This is a problem because I try to keep the walltime of my job as low as possible to get the job started fast (the cluster is constantly full, so it's difficult to get the job started if I require too many resources). So sometimes my job gets interrupted by the scheduler prematurely.

My idea was to add a timer in the server process and if the current walltime gets close to the maximum (two minutes in the code below), to close the file stream. In this way, at least I won't lose the data that was already collected. This however will not work, as the timer only gets updated when the server receives new data. Opening the file only when the server receives new data should be avoided, as I prefer to start with an empty output file each time I commit the job. What other options are available to ensure that I do not lose the output that was already collected?

#include <mpi.h>
#define RES 1

int main(int argc, char *argv[]){

  int nprocs, myid, server, ndone;
  double WallTime;
  struct timeval start, end;
  double countTime, res[4];
  FILE *fpt;

  WallTime = 1*60+59;

  MPI_Comm world;
  MPI_Group world_group;
  MPI_Status status;
  MPI_Init(&argc, &argv);
  world = MPI_COMM_WORLD;
  MPI_Comm_size(world,&nprocs);
  MPI_Comm_rank(world,&myid);
  server = nprocs-1; /* last proc is server */
  MPI_Comm_group(world, &world_group);

  if(myid == server){ /* I store the output */
    ndone = 0;
    fpt = fopen(argv[2],"wt");
    gettimeofday(&start, NULL);
    do{
      MPI_Recv(res, 4, MPI_DOUBLE, MPI_ANY_SOURCE, RES, world, &status);
      fprintf(fpt,"%d\t%10.7f\t%10.7f\t%ld\n", (int) res[0], res[1], res[2], (long int) res[3]);
      gettimeofday(&end, NULL);
      countTime = (end.tv_sec+(end.tv_usec)*1.e-6)-(start.tv_sec+(start.tv_usec)*1.e-6);
      ndone++;
    } while (ndone < (nprocs-1) && countTime < WallTime);
    fclose(fpt);
  } else if(myid<(nprocs-1)){
    do sth with data according to myid ...
    MPI_Send(res, 4, MPI_DOUBLE, server, RES, world);
  }
  MPI_Finalize();
}
Foi útil?

Solução

Option 1: Use non-blocking probes to check if a message is waiting and sleep a bit if not:

do {
  int flag;
  MPI_Iprobe(MPI_ANY_SOURCE, RES, &flag, world, &status);
  if (flag) {
    MPI_Recv(res, 4, MPI_DOUBLE, status.MPI_SOURCE, RES, world, &status);
    ...
    ndone++;
  }
  else
    usleep(10000);
  gettimeofday(&end, NULL);
  countTime = (end.tv_sec+(end.tv_usec)*1.e-6)-(start.tv_sec+(start.tv_usec)*1.e-6);
} while (ndone < (nprocs - 1) && countTime < WallTime);

You could skip the usleep() call and then the master process will run a tight loop, keeping the CPU utilisation at almost 100%. This is usually not a problem on HPC systems where each MPI rank is bound to a separate CPU core.

Option 2: Most resource managers can be configured to deliver a Unix signal some time before the job is about to be killed. For example, both Sun/Oracle Grid Engine and LSF deliver SIGUSR2 some time before the job gets killed with SIGKILL. For SGE, one should add the -notify option to qsub to make it send SIGUSR2. The amount of time between SIGUSR2 and the following SIGKILL is configurable by the SGE admin on a per-queue basis. LSF sends SIGUSR2 when the job end time is reached and if the job does not terminate within 10 minutes after that, it sends SIGKILL.

Option 3: If your resource manager is uncooperative and not sending warning signals before killing your job, you could simply send yourself SIGALRM. You would usually do the following:

  • create a timer using timer_create();
  • (re-)arm the timer using timer_settime();
  • destroy the timer using timer_delete() in the end.

You could either program the timer to expire shortly before the total wall-clock time (but that is a bad programming practice since you have to match that value to the wall-clock time requested with the resource manager) or you could have the timer fire at short intervals, e.g. 5 mins, and then rearm it every time.

Option 2 and 3 require that you write and set a signal handler for the corresponding signal(s). The nice thing about signals is that they are usually delivered asynchronously, even if your code is stuck inside a blocking MPI call like MPI_Recv. I would consider this an advanced topic and would recommend that you stick to option 1 for now and just keep in mind that options 2 and 3 exist.

Option 4: Some MPI libraries support checkpoint/restart of the running jobs. Checkpointing creates a snapshot of your MPI job's running state and then the state can be restored with special mpiexec (or whatever the name of the MPI launcher, if any) command-line flags. This method requires zero changes to your program's source code but is usually not widely available, especially on cluster setups.

Outras dicas

It appears that you are using buffered file I/O via fprintf() when an asynchronous kill signal arrived from the job scheduler. The asynchronous signal will abort the job and glibc won't have the opportunity to flush its file buffer. You may be tempted to use fflush() from a signal handler, but fflush() is not asynchronous signal handler safe.

Here are a couple suggestions to avoid getting too complex:

Unbuffered I/O:

Simple solution would be to switch the file descriptor to be non-blocking. You can do this via:

setbuf(filehandle, NULL);

Since this is unbuffered, glibc won't be performing write-combining. If the fprintf()s are infrequent, it won't be a problem. But if you're writing many short fprintf() calls, this may not be a good option for performance.

Periodically flush the file contents

The glibc fflush() command can push out the data in the buffer. This mimics the unbuffered I/O case only if you make the fflush after each fprintf(). However, fflush() provides a bit more flexibility. Since it appears that you cannot rely a maximum MPI_Recv() time, you might consider periodically flushing the file buffer.

One way to do this is by spawning a separate thread using pthread_create() and have the new thread periodically call fflush(filehandle). Once a second should be a good frequency. You will need to use some care to ensure that the filehandle remains valid between the two threads.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top