How you solve this problem:
Create each thread in such a way that it waits on a sentinel before it's allowed to begin the user's work function (you'll need a lambda that calls it) If any of the threads fail to start, set a flag to indicate that existing threads should all finish immediately rather than perform the user's function. In the error case, join the threads that did start. Then exit with an error code or exception as you wish (exception is better).
Now your function is thread safe and will not leak memory.
EDIT: here's some code that does what you want, including a test.
If you want to force a simulation of a thread failing, recompile with INTRODUCE_FAILURE
defined to 1
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <memory>
#include <atomic>
#include <system_error>
#include <condition_variable>
#define INTRODUCE_FAILURE 0
// implementation
void ParallelCall(void (*function)(int, int), int numThreads)
{
std::vector<std::thread> threads;
threads.reserve(numThreads-1);
std::atomic<bool> mustAbort ( false );
std::atomic<bool> mayRun ( false );
std::mutex conditionMutex;
std::condition_variable runCondition;
for(int i = 1; i < numThreads; ++ i) {
try {
#if INTRODUCE_FAILURE == 1
if (i == 3) {
throw std::system_error(99, std::generic_category(), "the test deliberately failed a thread");
}
#endif
threads.emplace_back( std::thread{ [i, numThreads, function
, &mustAbort
, &conditionMutex
, &runCondition
, &mayRun]()->int {
std::unique_lock<std::mutex> myLock(conditionMutex);
runCondition.wait(myLock, [&mayRun]()->bool {
return mayRun;
});
myLock.unlock();
// wait for permission
if (!mustAbort) {
function(i, numThreads);
}
return 0;
}} );
}
catch(std::exception& e) { // will be a std::system_error
mustAbort = true;
std::unique_lock<std::mutex> myLock(conditionMutex);
mayRun = true;
conditionMutex.unlock();
runCondition.notify_all();
for(auto& t : threads) {
t.join();
}
throw;
}
}
std::unique_lock<std::mutex> myLock(conditionMutex);
mayRun = true;
conditionMutex.unlock();
runCondition.notify_all();
function(0, numThreads);
// use the calling thread as thread 0
for(auto& t : threads) {
t.join();
}
}
// test
using namespace std;
void testFunc(int index, int extent) {
static std::mutex outputMutex;
unique_lock<mutex> myLock(outputMutex);
cout << "Executing " << index << " of " << extent << endl;
myLock.unlock();
this_thread::sleep_for( chrono::milliseconds(2000) );
myLock.lock();
cout << "Finishing " << index << " of " << extent << endl;
myLock.unlock();
}
int main()
{
try {
cout << "initiating parallel call" << endl;
ParallelCall(testFunc, 10);
cout << "parallel call complete" << endl;
}
catch(std::exception& e) {
cout << "Parallel call failed because: " << e.what() << endl;
}
return 0;
}
Example output on success:
Compiling the source code....
$g++ -std=c++11 main.cpp -o demo -lm -pthread -lgmpxx -lgmp -lreadline 2>&1
Executing the program....
$demo
initiating parallel call
Executing 0 of 10
Executing 1 of 10
Executing 4 of 10
Executing 5 of 10
Executing 8 of 10
Executing 2 of 10
Executing 7 of 10
Executing 6 of 10
Executing 9 of 10
Executing 3 of 10
Finishing 1 of 10
Finishing 5 of 10
Finishing 2 of 10
Finishing 9 of 10
Finishing 8 of 10
Finishing 4 of 10
Finishing 3 of 10
Finishing 0 of 10
Finishing 6 of 10
Finishing 7 of 10
parallel call complete
Example output on failure:
Compiling the source code....
$g++ -std=c++11 main.cpp -o demo -lm -pthread -lgmpxx -lgmp -lreadline 2>&1
Executing the program....
$demo
initiating parallel call
Parallel call failed because: the test deliberately failed a thread: Cannot assign requested address
Finally a plea - do not unleash your library on the world. The std::thread library is very comprehensive, and if that isn't enough we have OpenMP, TBB, etc. etc.