Question

Short version:

How to deal with the non-atomicity of spawning a group of threads, running some custom (unspecified at the time of implementation) callback? Several possible solutions are described below, it seems like using a thread pool is the only good solution. Is there a standard way to deal with it? No need to post complete C++ solutions, pseudocode or a brief description will suffice. Performance is an important aspect here.

Although it may seem trivial, I believe the below fragment of code occurs in many of the existing applications, many (beginning, probably also some of the advanced) programmers may write similar constructs without even realizing the dangers. And the problems are the same for pthread / C++11 std::thread / WinAPI and probably many other low-level multithreading libraries. Therefore it is an important question.

Long version:

I'm designing some multithreaded application and I decided to make an utility function, where several threads are spawned. This is perhaps a quite common code, it comes up in many of my applications (unless they are using OpenMP):

void ParallelCall(void (*function)(int, int), int numThreads)
{
    Thread *threads = new Thread[numThreads - 1];
    for(int i = 1; i < numThreads; ++ i) {
        if(threads[i - 1].start(&function, i, numThreads)) // this might fail
            abort(); // what now?
    }

    (*function)(0, numThreads);
    // use the calling thread as thread 0

    for(int i = 1; i < numThreads; ++ i)
        threads[i - 1].join();
    delete[] threads;
}

This is more of a pseudo code for illustration of the problem. A bunch of threads are being created and spawned (the Thread object wraps a pthread thread). Then they do something and finally they are joined.

Now the problem: what if, for whatever reason, some of the threads fail to start (may be resource exhaustion or a per-user limit)? I know how to detect that it happened, but I'm not sure how to handle it.

I guess I should wait for the successfully started threads to finish and then throw an exception. However, if the code in function contains some synchronization (such as a barrier), this can easily result in a deadlock because the rest of the expected threads will never spawn.

Alternately, I could just throw an exception right away, ignoring the running threads, but then I leave the wrapper objects allocated, causing a memory leak (and also never joining the spawned threads).

Doing stuff like killing the running threads doesn't seem to be a good idea (I'm frankly not quite sure what is the result of forcibly killing a thread of a multithreaded application - seems like the memory will be left in undefined state, which is mostly kind of hard to handle - and could on itself lead to more memory leaks if the callback function allocates memory).

Inserting a wait for all the threads to start before letting them enter the callback function seems unbearable performance-wise (although it would solve the issue easily). Another option would be to have a pool of spawned threads with associated FIFOs, waiting for tasks, but there is a problem with the number of threads (I would spawn as many threads as there are logical CPUs, but then what if numThreads is larger? I would be essentially reimplementing OS' scheduler in my code).

How is this commonly solved? Is there a better way? If not, is a potential (depending on what's in the callback function) deadlock better than a memory leak?

Was it helpful?

Solution

How you solve this problem:

Create each thread in such a way that it waits on a sentinel before it's allowed to begin the user's work function (you'll need a lambda that calls it) If any of the threads fail to start, set a flag to indicate that existing threads should all finish immediately rather than perform the user's function. In the error case, join the threads that did start. Then exit with an error code or exception as you wish (exception is better).

Now your function is thread safe and will not leak memory.

EDIT: here's some code that does what you want, including a test. If you want to force a simulation of a thread failing, recompile with INTRODUCE_FAILURE defined to 1

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <memory>
#include <atomic>
#include <system_error>
#include <condition_variable>

#define INTRODUCE_FAILURE 0
// implementation

void ParallelCall(void (*function)(int, int), int numThreads)
{
    std::vector<std::thread> threads;
    threads.reserve(numThreads-1);

    std::atomic<bool> mustAbort ( false );
    std::atomic<bool> mayRun ( false );
    std::mutex conditionMutex;
    std::condition_variable runCondition;

    for(int i = 1; i < numThreads; ++ i) {
        try {
            #if INTRODUCE_FAILURE == 1
            if (i == 3) {
                throw std::system_error(99, std::generic_category(),  "the test deliberately failed a thread");
            }
            #endif
            threads.emplace_back( std::thread{ [i, numThreads, function
                                , &mustAbort
                                , &conditionMutex
                                , &runCondition
                                , &mayRun]()->int {
                std::unique_lock<std::mutex> myLock(conditionMutex);
                runCondition.wait(myLock, [&mayRun]()->bool { 
                    return mayRun;
                });
                myLock.unlock();
                // wait for permission
                if (!mustAbort) {
                    function(i, numThreads);
                }
                return 0;
            }} );
        }
        catch(std::exception& e) { // will be a std::system_error
            mustAbort = true;
            std::unique_lock<std::mutex> myLock(conditionMutex);
            mayRun = true;
            conditionMutex.unlock();
            runCondition.notify_all();
            for(auto& t : threads) {
                t.join();
            }
            throw;
        }
    }

    std::unique_lock<std::mutex> myLock(conditionMutex);
    mayRun = true;
    conditionMutex.unlock();
    runCondition.notify_all();

    function(0, numThreads);
    // use the calling thread as thread 0

    for(auto& t : threads) {
        t.join();
    }
}

// test

using namespace std;

void testFunc(int index, int extent) {
    static std::mutex outputMutex;

    unique_lock<mutex> myLock(outputMutex);
    cout << "Executing " << index << " of " << extent << endl;
    myLock.unlock();

    this_thread::sleep_for( chrono::milliseconds(2000) );

    myLock.lock();
    cout << "Finishing " << index << " of " << extent << endl;
    myLock.unlock();
}

int main()
{
    try {
        cout << "initiating parallel call" << endl;
        ParallelCall(testFunc, 10);
        cout << "parallel call complete" << endl;
    }
    catch(std::exception& e) {
        cout << "Parallel call failed because: " << e.what() << endl;
    }
   return 0;
}

Example output on success:

Compiling the source code....
$g++ -std=c++11 main.cpp -o demo -lm -pthread -lgmpxx -lgmp -lreadline 2>&1

Executing the program....
$demo 
initiating parallel call
Executing 0 of 10
Executing 1 of 10
Executing 4 of 10
Executing 5 of 10
Executing 8 of 10
Executing 2 of 10
Executing 7 of 10
Executing 6 of 10
Executing 9 of 10
Executing 3 of 10
Finishing 1 of 10
Finishing 5 of 10
Finishing 2 of 10
Finishing 9 of 10
Finishing 8 of 10
Finishing 4 of 10
Finishing 3 of 10
Finishing 0 of 10
Finishing 6 of 10
Finishing 7 of 10
parallel call complete

Example output on failure:

Compiling the source code....
$g++ -std=c++11 main.cpp -o demo -lm -pthread -lgmpxx -lgmp -lreadline 2>&1

Executing the program....
$demo 
initiating parallel call
Parallel call failed because: the test deliberately failed a thread: Cannot assign requested address

Finally a plea - do not unleash your library on the world. The std::thread library is very comprehensive, and if that isn't enough we have OpenMP, TBB, etc. etc.

OTHER TIPS

How about letting the threads that did get created help out by doing the lost work before they exit their threadproc?

List _StillBornWork;

void ParallelCall(void (*function)(int, int), int numThreads)
{
    Thread *threads = new Thread[numThreads - 1];
    for(int i = 1; i < numThreads; ++ i) {
        if(threads[i - 1].start(&function, i, numThreads)) {
            _StillBornWork.Push(i);
        }
    }

    (*function)(0, numThreads);
    // use the calling thread as thread 0

    for(int i = 1; i < numThreads; ++ i)
        threads[i - 1].join();
    delete[] threads;
}

ThreadProc(int i) {

  while(1) {
    do work

    // Here we see if there was any work that didn't get done because its thread
    // was stilborn.  In your case, the work is indicated by the integer i.
    // If we get work, loop again, else break.
    if (!_StillBornWork.Pop(&i))
      break;  // no more work that wasn't done.
  }

}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top