Here's a simplistic way to implement your own 'semaphore' (since I don't think the standard library or boost have one). This chooses a 'cooperative' approach and workers will wait for each other:
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
void the_work(int id)
{
static int running = 0;
std::cout << "worker " << id << " entered (" << running << " running)\n";
static mutex mx;
static condition_variable cv;
// synchronize here, waiting until we can begin work
{
unique_lock<mutex> lk(mx);
cv.wait(lk, phoenix::cref(running) < 3);
running += 1;
}
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done\n";
// signal one other worker, if waiting
{
lock_guard<mutex> lk(mx);
running -= 1;
cv.notify_one();
}
}
int main()
{
thread_group pool;
for (int i = 0; i < 10; ++i)
pool.create_thread(bind(the_work, i));
pool.join_all();
}
Now, I'd say it's probably better to have a dedicated pool of n workers taking their work from a queue in turns:
#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
class thread_pool
{
private:
mutex mx;
condition_variable cv;
typedef function<void()> job_t;
std::deque<job_t> _queue;
thread_group pool;
boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}
public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}
void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));
cv.notify_one();
}
optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;
cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
if (_queue.empty())
return none;
auto job = std::move(_queue.front());
_queue.pop_front();
return std::move(job);
}
~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}
pool.join_all();
}
};
void the_work(int id)
{
std::cout << "worker " << id << " entered\n";
// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done\n";
}
int main()
{
thread_pool pool; // uses 1 thread per core
for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
}
PS. You can use C++11 lambdas instead boost::phoenix there if you prefer.