Question

Trying to make a simple multithreading example using for loops. I'm trying to make them loop in block like this:

Thread 1 printing 0
Thread 2 printing 0
Thread 3 printing 0
Thread 4 printing 0
Thread 1 printing 1
Thread 2 printing 1
Thread 3 printing 1
Thread 4 printing 1

This means: they all print "1", then they all wait for everyone to have done that, then they all print "2", wait again for everyone, print "3", etc.

So I wrote this code:

#include <iostream>
#include <thread>
#include <string.h>

using namespace std;

bool flags[4] = {true,true,true,true};

bool checkAll(){
    bool res = false;
    for(int i=0;i<4;i++){
        res = res|flags[i];
    }
    return res;
}

void printer(int id){
    for(int i=0;i<100;i++){
        flags[id] = true;
    cout << "Thread: " << id << " printing " << i << endl;
    flags[id] = false;
    while(checkAll()) {}
}
}

int main(int argc, char *argv[]){
    thread t1(printer,0);
    thread t2(printer,1);
    thread t3(printer,2);
    thread t4(printer,3);

    t4.join();
    t3.join();
    t2.join();
    t1.join();

    return 0;
}

But it doesn't work as expected. As far as I know, it does not work due to concurrency problems (several threads reading/writing the same variable).

So, I tried to solve it using condition variables:

#include <iostream>
#include <thread>
#include <string.h>
#include <mutex>
#include <condition_variable>

using namespace std;

bool flags[4] = {true,true,true,true};
mutex m;
condition_variable g_queuecheck;
bool done = false;

bool checkAll(){
    bool res = false;
    for(int i=0;i<4;i++){
        res = res|flags[i];
    }
    return res;
}

void printer(int id){
unique_lock<mutex> locker(m);
for(int i=0;i<100;i++){
    flags[id] = true;
    cout << "Thread: " << id << " printing " << i << endl;
    flags[id] = false;
    g_queuecheck.wait(locker);
}
}

void controller(){
while(!done){
    if(!checkAll()){
        g_queuecheck.notify_all();
    }
    }
}

int main(int argc, char *argv[]){
thread t0(controller);
thread t1(printer,0);
thread t2(printer,1);
thread t3(printer,2);
thread t4(printer,3);

t4.join();
    t3.join();
    t2.join();
    t1.join();

    done = true;
    t0.join();

    return 0;
}

But doesn't work either. So, here come my questions: Is there a possible way of doing it simple like in the first code? If not, what am I doing wrong in the second one? Thanks a lot.

Was it helpful?

Solution

Your examples do not work, because there are race conditions in updating and checking the flags array.

Seems like what you want though is a well known primitive called a barrier. This can be implemented, for example, using semaphores. See section 3.6 of The Little Book of Semaphores for details on how this works.

With a barrier your code can be written concisely as:

const int nThreads = 4;
const int nIter = 100;

mutex m;
barrier barrier(nThreads);


void printer(int id) {
  for (int i = 0; i < nIter; i++) {
    {  
      lock_guard<mutex> lock(m); // lock to prevent interleaved console output
      cout << "Thread: " << id << " printing " << i << endl;
    }
    barrier.wait();
  }
}

int main(int argc, char **argv) {
  vector<thread> ts;
  for (int i = 0; i < nThreads; i++) {
    ts.emplace_back(thread(printer, i));
  }

  for (int i = 0; i < nThreads; i++) {
    ts[i].join();
  }

  return 0;
}

Below is a simple semaphore implementation (copied from here).

class semaphore {
private:
    mutex mtx;
    condition_variable cv;
    int count;

public:
    semaphore(int count_ = 0):count(count_){;}
    void notify()
    {
        unique_lock<mutex> lck(mtx);
        ++count;
        cv.notify_one();
    }
    void wait()
    {
        unique_lock<mutex> lck(mtx);

        while(count == 0){
            cv.wait(lck);
        }
        count--;
    }
};

Using that, you can implement a barrier as in the referenced book:

class barrier {
  public: 
    barrier(int n): n(n), count(0) {}

    void wait() {
      phase1();
      phase2();
    }
  private:
    mutex m;
    semaphore turnstile1, turnstile2;
    int n, count;

    void phase1() {
      m.lock();
      count++;
      if (count == n) {
        for (int i = 0; i < n; i++)
          turnstile1.notify();
      }
      m.unlock();
      turnstile1.wait();
    }

    void phase2() {
      m.lock();
      count--;
      if (count == 0) {
        for (int i = 0; i < n; i++)
            turnstile2.notify();  
      }
      m.unlock();
      turnstile2.wait();
    }
};

OTHER TIPS

Well, this does what you want. It worked without the atomic but I figured what the heck, throw it in anyway. :)

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>

const size_t num_threads = 10;
const size_t num_reps = 10;

std::mutex m;
std::atomic_int pos;

void printer(int id)
{
    for(int i = 0; i < num_reps; ++i)
    {
        std::unique_lock<std::mutex> l(m);
        if(pos.load() == id)
        {
            std::cout << "Thread: " << id << " printing " << i << std::endl;
            pos.exchange((pos.load() + 1) % num_threads);
        }
    }
}

int main()
{
    m.lock();
    pos.store(0);
    std::vector<std::thread> v;
    for(int i = 0; i < num_threads; ++i)
    {
        v.emplace_back(std::thread(printer, i));
    }
    m.unlock();

    bool done;
    do
    {
        done = true;
        for(int i = 0; i < num_threads; ++i)
        {
            if(v[i].joinable())
            {
                done = false;
                v[i].join();
            }
        }
    }
    while(!done);

    return 0;
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top