Question

I have a reinforcement learning project. For this I created a vectorized environment in C++, which is a handler for multiple instances of a simple game. It is highly parallelizable. Each worker can work on their own batch of instances, there is no information exchange between them. (For the most part.)

My current approach is to use std::asynch in the following manner:

[...]
auto foo = [&](int thread_id) { 
    size_t chunk_size = env_count / (worker_count) + 1;
    size_t my_start = thread_id * chunk_size;
    size_t my_end = std::min(my_start + chunk_size, env_count);

    // Take care of instances between the given interval
    move_piece(my_start, my_end, ...);
    raise_turns(my_start, my_end, ...);
    raise_score(my_start, my_end, ...);
    do_magic(my_start, my_end, ...);
         
    return;
};

std::vector<std::future<void>> futures;
futures.reserve(worker_count);

for (auto p = 0; p < worker_count; ++p) {
    futures.emplace_back(std::async(std::launch::async, foo, p));
}

// Need to wait for all to finish
for (auto &elem : futures) {
    elem.wait();
}

This code runs each time the envs take an action.

I was wondering if the constant re-starting of threads (e.g. recreating the std::async for each time the envs take action) causes significant overheads and taken a look at thread pooling. I implemented this solution (with extra flags to know if all threads are finished) that utilizes a single job queue and multiple workers. It was slower then using std::asynch.

Is there a possibly better way to utilize concurrency? I was thinking about a one-queue-per-worker thread pool implementation, but you might be able to give me an even better approach.

Thanks in advance!

P.S.:

As an extra info, I tried to utilize data oriented design and therefore my functions share structure with the following toy example:

struct A {
  int x;
  int y;
};

struct B {
  int z;
};

void foo(size_t my_start, size_t my_end, std::vector<A> in, std::vector<B> out){
  for (size_t ii = my_start; ii < my_end; ++ii){
    out[ii].z = in[ii].x + in[ii].y;
  }
}

Was it helpful?

Solution

I was wondering if the constant re-starting of threads causes significant overheads and taken a look at thread pooling.

Constant re-starting of threads is indeed a bad idea, but std::async doesn't tell you how it manages its threads. It certainly isn't required to launch a new thread for every task. Perhaps your implementation is already managing a thread pool better than the one you wrote yourself.

If it's performing well enough, then you got everything you need for free already. Congratulations! You have a good implementation. Use it.

  • Edit to clarify since this seems to be confusing: when I said "std::async doesn't tell you how it manages its threads. It certainly isn't required to launch a new thread for every task." I did indeed mean that each call to std::async(std::launch::async, ...) is not required to launch a new thread. It is permitted to use a persistent pool of reusable threads, and it's possible that your implementation is already doing this.

    The requirement is that the function is invoked as if in a new thread of execution, where "as if" is the normal standard weasel wording for feel free to do something that performs better so long as it behaves the same.

Is there a possibly better way to utilize concurrency? I was thinking about a one-queue-per-worker thread pool implementation, but you might be able to give me an even better approach.

If you have a queue per thread, then the producer needs to decide which thread does the work. This is generally worse, but might be fine if all the tasks take the same time. If you think the single queue is a significant bottleneck, then you should profile to figure out why. It shouldn't be holding the lock very long.

If your tasks are very fine-grained, the queue can be a bottleneck, but that's because (un)locking the mutex is expensive compared to the task itself. Just make your tasks bigger or batch them together to amortize the synchronization cost.

OTHER TIPS

The only thing that worries me, for now, is the error handling and if any of the tasks hang... From my experience, you only should use wait() for short functions and that you really do not care so much about the result.

I would suggest using std::future::wait_for and promises with set exception: std::promise::set_exception

Licensed under: CC-BY-SA with attribution
scroll top