Question

I was trying to implement a checkpointing scheme based on multithreaded fork using fork combined with setjmp/longjmp. I was hoping my solution would work but as expected it didn't. The code is shown below with an example usage for checkpoint/rollback.

The main idea is to allocate stacks for the threads myself, as done by using the function pthread_create_with_stack and then just use a fork from the main thread. The forked process (checkpoint) is suspended at the beginning and when awoken (rollbacking), the main thread of the forked process recreates the threads by calling pthread_create and use the same stacks as threads in original process. Also longjmp is done in the thread routine at the beginning, so as to jump to the same point in the code when process was forked as a checkpoint. Note that all setjmp calls are done inside function my_pthread_barrier_wait so that no thread has acquired a lock.

I think the problem here is setjmp/lonjmp. Will getcontext/savecontext/makecontext help here, or anything else? Can even setjmp/longjmp be used in such a way here that it works? Any solution will be greatly appreciated.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <unistd.h>
#include <semaphore.h>
#include <signal.h>
#include <sys/types.h>
#include <setjmp.h>

#define PERFORM_JMP

#define NUM_THREADS 4

void *stackAddr[NUM_THREADS];
pthread_t thread[NUM_THREADS];
jmp_buf buf[NUM_THREADS];
pthread_attr_t attr[NUM_THREADS];
pthread_barrier_t bar;
sem_t sem;
pid_t cp_pid;
int rollbacked;
int iter;
long thread_id[NUM_THREADS];

void *BusyWork(void *t);

void sig_handler(int signum)
{
    printf( "signal_handler posting sem!\n" );
    sem_post( &sem );
}

int pthread_create_with_stack( void *(*start_routine) (void *), int tid )
{
    const size_t STACKSIZE = 0xC00000; //12582912
    size_t i;
    pid_t pid;
    int rc;

    printf( "tid = %d\n", tid );

    pthread_attr_init( &attr[tid] );
    stackAddr[tid] = malloc(STACKSIZE);
    pthread_attr_setstack( &attr[tid], stackAddr[tid], STACKSIZE );

    thread_id[tid] = tid;
    rc = pthread_create( &thread[tid], &attr[tid], start_routine, (void*)&thread_id[tid] );

    if (rc) 
    {
        printf("ERROR; return code from pthread_create() is %d\n", rc);
        exit(-1);
    }

    return rc;
}

pid_t checkpoint()
{
    pid_t pid;
    int t, rc;

    switch (pid=fork()) 
    {
    case -1: 
        perror("fork"); 
        break;
    case 0:         // child process starts
        sem_wait( &sem );
        rollbacked = 1;
        printf( "case 0: rollbacked = 1, my pid is %d\n", getpid() );
        for( t = 1; t < NUM_THREADS; t++ ) 
        {
            printf( "checkpoint: creating thread %d again\n", t );
            rc = pthread_create( &thread[t], &attr[t], BusyWork, (void*)&thread_id[t] );
            if (rc) 
            {
                printf("ERROR; return code from pthread_create() is %d\n", rc);
                exit(-1);
            }
        }
        return 1;  // child process ends
    default:        // parent process starts
        return pid;
    }
}

void restart_from_checkpoint( pid_t pid )
{
    printf( "Restart_from_checkpoint, sending signal to %d!\n", pid );
    kill( pid, SIGUSR1 );
    exit( 0 );
}

void take_checkpoint_or_rollback( int sig_diff )
{
    if ( cp_pid )
    {
        if ( sig_diff )
        {
            printf( "rollbacking\n" );
            if ( !rollbacked )
                restart_from_checkpoint( cp_pid );
        }
        else
        {
            kill( cp_pid, SIGKILL );
            cp_pid = checkpoint();
            printf( "%d: cp_pid = %d!\n", getpid(), cp_pid );
        }
    }
    else
        cp_pid = checkpoint();
}

void my_pthread_barrier_wait( int tid, pthread_barrier_t *pbar )
{
    pthread_barrier_wait( pbar );
#ifdef PERFORM_JMP   
    if ( tid == 0 )
    {
        if ( !rollbacked )
        {
            take_checkpoint_or_rollback( ++iter == 4 );
        }
    }
    if ( setjmp( buf[tid] ) != 0 ) {}
    else {}
    printf( "%d: %d is waiting at the second barrier!\n", getpid(), tid );
#endif
    pthread_barrier_wait( pbar );
}

void *BusyWork(void *t)
{
   volatile int i;
   volatile long tid = *((long*)t);
   volatile double result = 0.0;

   printf( "thread %ld in BusyWork!\n", tid );
#ifdef PERFORM_JMP   
   if ( rollbacked )
   {
    printf( "hmm, thread %ld is now doing a longjmp, goodluck!\n", tid );
    longjmp( buf[tid], 1 );
   }
#endif
   printf("Thread %ld starting...\n",tid);
   for ( i = 0; i < 10; i++)
   {
      result += (tid+1) * i;
      printf( "%d: tid %ld: result = %g\n", getpid(), tid, result );
      my_pthread_barrier_wait(tid, &bar);
   }
   printf("Thread %ld done. Result = %g\n", tid, result);
   //pthread_exit((void*) t);
}

int main (int argc, char *argv[])
{
   int rc;
   long t;
   void *status;

   /* Initialize and set thread detached attribute */
   pthread_barrier_init(&bar, NULL, NUM_THREADS);
#ifdef PERFORM_JMP   
   signal(SIGUSR1, sig_handler);
   sem_init( &sem, 0, 0 );
#endif
   for( t = 1; t < NUM_THREADS; t++ ) 
   {
      printf( "Main: creating thread %ld\n", t );
      rc = pthread_create_with_stack( BusyWork, t ); // This is the line 52
      if (rc) 
      {
         printf("ERROR; return code from pthread_create() is %d\n", rc);
         exit(-1);
      }
   }

   thread_id[0] = 0;
   BusyWork( &thread_id[0] );

   /* Free attribute and wait for the other threads */
   for(t=1; t<NUM_THREADS; t++) 
   {
      rc = pthread_join(thread[t], &status);
      if (rc) 
      {
         printf("ERROR; return code from pthread_join() is %d\n", rc);
         exit(-1);
      }
      printf("Main: completed join with thread %ld having a status"   
            "of %ld\n",t,(long)status);
    }

    printf("Main: program completed. Exiting.\n");
    pthread_exit(NULL);
}
Était-ce utile?

La solution

What you're trying to do is simply impossible. fork is fundamentally incompatible with synchronization. Even if you could recreate threads reliably in the child process, they would have new thread ids, and thus they would not be the owners of the locks they're supposed to own.

The only way to do checkpointing is with advanced operating system support for it. This would have to include separate pid namespaces, so that the checkpointed copy of the program would have the same pid, and all its threads would have the same thread ids. Even then, if it's performing communication with other processes or the outside world, it won't work. I believe there are some tools for doing this on Linux, but I'm not familiar with them, and at this point you're getting to level of hacks where it's appropriate to ask if there's a better way to achieve what you're trying to do.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top