Question

I have code similar to the following code

boost::thread myThread
unsigned char readbuffer[bignumber];
unsigned char writebuffer[bignumber];

for(int i=0; i<bignumber; ++i){
  functiondostuff();
  for(int j=0; j<2; ++j){
    functiondomorestuff();
    myThread = boost::thread(&myClass::myFunction, this, j, i);
  }     
}

myFunction reads from a buffer and writes to another. It will never write to the same location in the write buffer. Am I doing something fundamentally wrong with threads here? Is it bad to loop over a thread creation with the same thread name? It runs smooth for a while and then I get the following exception.

terminate called after throwing an instance of 'boost::exception_detail::clone_impl >' what(): boost::thread_resource_error: Resource temporarily unavailable Aborted

What does this exception mean? Any ideas would be helpful.

Was it helpful?

Solution

There's a limit on the number of threads you can create per process.

On linux, for example,

cat /proc/sys/kernel/threads-max

tells you the current maximum. The default is the number of memory pages/4, so on my system it's 513785, but it may be much much lower on another box. E.g. on my mail server box (512mb RAM) it's only 7295.

You could the limit. But in fact that will be useless because the OS can't schedule them effectively. So, instead, try using a thread pool.

Oh. PS. detach()-ing he threads will help (a lot) with conserving resources. pthreads might be blocking thread creation well before the OS limit is reached because it needs to allocate overhead tracking the active threads. detach frees those up (and removes the error of not joining all threads before program exit).

UPDATE Crazy friday bonus: a thread pool that auto-scales to the number of cores your system has:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

boost::atomic_size_t counter(0ul);

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));

          cv.notify_one();
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          auto job = std::move(_queue.front());
          _queue.pop_front();

          return std::move(job);
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

static constexpr size_t bignumber = 1 << 20;

class myClass 
{
    //unsigned char readbuffer[bignumber];
    //unsigned char writebuffer[bignumber];
    void functiondostuff() { }
    void functiondomorestuff() { }

    thread_pool pool; // uses 1 thread per core

  public:
    void wreak_havoc()
    {
        std::cout << "enqueuing jobs... " << std::flush;
        for(size_t i=0; i<bignumber; ++i)
        {
            functiondostuff();
            for(int j=0; j<2; ++j) {
                functiondomorestuff();
                pool.enqueue(bind(&myClass::myFunction, this, j, i));
            }     
        }
        std::cout << "done\n";
    }

  private:
    void myFunction(int i, int j)
    {
        boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
        counter += 1;
    }
};

int main()
{
    myClass instance;
    instance.wreak_havoc();

    size_t last = 0;
    while (counter < (2*bignumber))
    {
        boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
        if ((counter >> 4u) > last)
        {
            std::cout << "Progress: " << counter << "/" << (bignumber*2) << "\n";
            last = counter >> 4u;
        }
    }
}

OTHER TIPS

First, I wasn't explicitly killing off my threads with thread.join() or thread.detach() so the total number of threads would grow out of control and throw an exception.

Also, in my inner loop I was creating two threads but keeping only one handle. So I lost control of 1 thread each iteration of the outer loop causing the total number of threads to grow past the prescribed limit of threads causing the exception.

To retain unique handles to the threads created in the inner loop I added them into a boost::group and used a myGroup.join_all(). I suppose I also could have used a vector to push the handles onto and then popped them off to join them all.

boost::thread_group myThreadGroup
unsigned char readbuffer[bignumber];
unsigned char writebuffer[bignumber];

for(int i=0; i<bignumber; ++i){
  functiondostuff();
  for(int j=0; j<2; ++j){
    functiondomorestuff();
    myThreadGroup.create_thread(boost::bind(&myClass::myFunction, this, i, j));
  }    
  functiondostuffagain();
  myThreadGroup.join_all(); 
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top