Question

I have a program with several worker threads, and a main thread that receives jobs. In the main thread I want to queue the jobs onto a synchronized queue, and have the worker threads sitting there all waiting on the queue. When there's something in the queue I want the worker to pull the job from the queue, while the remaining work sits there waiting for another job.

I found CreateMsgQueue (http://msdn.microsoft.com/en-us/library/ms885180.aspx) however this appears to only exist for Windows CE.

I know I could write this myself, however if something already existed I'd be a fool not to use it.

I am developing in c++ using Visual Studio 2005.

Any suggestions gratefully received.

Thanks Rich

Was it helpful?

Solution

Windows doesn't provide exactly what you want. What it does provide is thread pools -- with these, you not only don't have to create the queue yourself, but you don't have to create or (directly) manage the threads either.

Of course, synchronized queues do exist too, just not as part of Windows. One I wrote looks like this:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top