Question

The only boost::lockfree that I've made work is spsc_queue, and it's amazing.

However, I'd like to implement it where one thread passes information back and forth with cores - 1 threads.

I was thinking that each of the worker threads would have its own set of spsc_queues, in and out, which would be stored in vectors where the main thread would pass information to one outgoing queue and then move to the next queue in the vector and so on as well as cycle through the incoming queues.

Can these spsc_queues in the two vectors be pushed and popped safely?

If not, is there an alternative way to use spsc_queues according to my intent?

Was it helpful?

Solution

You're basically proposing to use 2x(cores-1) spsc_queues in their intended fashion. Yes, this would work.

I don't see how you would obviously handle the responses ("incoming queues") on the main thread though. Bear in kind there is no "waiting" operation on the incoming queue, and neither would you want one (it wouldn't be very lockfree any more and you would leave all the other workers unserviced while waiting for an incoming message).

Aside: If you dimension your response queues such that they will never overflow, then you could get a long way with naive-roundrobin reading from it (just don't attempt to read all messages from a single response queue, because this is a sure-fire way to get scheduling starvation for the other response queues).

Code sample at the bottom (CODE SAMPLE)

All of this leads me to strongly suspect that you are actually after asynchrony, as opposed to concurrency. I have the feeling you would be very happy to get your application running on 1 thread, just servicing each available message - whatever it's source or content - as soon "as possible".

  1. This model will scale very well for large number of small messages that can be processed in very little time**[¹]**.
  2. When 1 thread is saturated, you could scale out by adding workers.
  3. In services that have messages that require significantly longer processing, you could offload these tasks on dedicated threads that only handle the low-frequency requests, in an asynchronous manner: they can just push small "completion" messages back onto the main work queue once they're done.

All of this would lead me to think of a library like libuv or Boost Asio. If you already know off-hand that you'll need to be running lock-less to get the throughput you need (this is quite rare outside industrial-strength server solutions) you could emulate the same using lock-less queues. This is much more work because you'll have to integrate a epoll/select/poll loop into your producers. I suggest you keep it simple simple simple and only adopt aditional complexities as you actually need them.

Mantra: correct, well-factored first; optimize later

(Note the "well-factored" there. In this case it means you will /not/ allow slow processing tasks on you high-throughput queues.)

CODE SAMPLE

As promised a simple Proof Of Concept that shows using multiple bi-directional SPSC queue messaging with several worker threads.

Completely lockfree version: Live On Coliru

There are quite some subtleties here. In particular note how underdimensioning the queues will lead to silently dropped messages. This won't happen if the consumers can keep up with the producers, but as long as there is OS activity you can't know so you should be adding checks for this.

UPDATE As per request in the comments, here's a version that checks for queue saturation - without dropping messages. See it Live On Coliru too.

  • No messages can be dropped
  • There are no more late arrivals (since the main loop doesn't exit until all responses have been received)
  • The round robin is no longer tied to the loop variable, because the sending might stall, which would result in reading the same response queue all the time. This is a recipe for dead lock or other worst-case performance.
  • In case of saturated queues, we had to think of a proper way to balance the load. I opted for a tiny sleep. Technically, this means that our lock-free wait-free solution degrades to regular co-operative multithreading when queues are saturated. Perhaps you would prefer to grow the queues if this is detected. It all depends on your system.
  • You will want to know when this happens; I've included simple congestion statistics for all threads. On my system, with microsleep call sleep_for(nanoseconds(1)), the output is:

    Received 1048576 responses (97727 100529 103697 116523 110995 115291 103048 102611 102583 95572 )
    
    Total: 1048576 responses/1048576 requests
    Main thread congestion: 21.2%
    Worker #0 congestion: 1.7%
    Worker #1 congestion: 3.1%
    Worker #2 congestion: 2.0%
    Worker #3 congestion: 2.5%
    Worker #4 congestion: 4.5%
    Worker #5 congestion: 2.5%
    Worker #6 congestion: 3.0%
    Worker #7 congestion: 3.2%
    Worker #8 congestion: 3.1%
    Worker #9 congestion: 3.6%
    
    real    0m0.616s
    user    0m3.858s
    sys 0m0.025s
    

    As you can see, the tuning on Coliru needed to be drastically different. This tuning would be required whenever your system runs a risk of running at maximum load.

  • Conversely, you'd have to think of how to throttle the load when a queue is empty: at this moment, workers will just busy loop on the queue, waiting for messages to appear. In a real server environment, when loads happen in bursts, you will want to detect "idle" periods and reduce the polling frequency so as to conserve CPU power (at the same time allowing the CPU to maximize througput on other threads).

Included in this answer is the second, "hybrid" version (lock-free until queue saturation):

#include <boost/lockfree/spsc_queue.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
#include <memory>
#include <iostream>
#include <iterator>

namespace blf = boost::lockfree;

static boost::atomic_bool shutdown(false);

static void nanosleep()
{
    //boost::this_thread::yield();
    boost::this_thread::sleep_for(boost::chrono::nanoseconds(1));
}

struct Worker
{
    typedef blf::spsc_queue<std::string > queue;
    typedef std::unique_ptr<queue> qptr;
    qptr incoming, outgoing;
    size_t congestion = 0;

    Worker() : incoming(new queue(64)), outgoing(new queue(64)) 
    {
    }

    void operator()()
    {
        std::string request;
        while (!shutdown)
        {
            while (incoming->pop(request)) 
                while (!outgoing->push("Ack: " + request))
                    ++congestion, nanosleep();
        }
    }
};

int main()
{
    boost::thread_group g;

    std::vector<Worker> workers(10);
    std::vector<size_t> responses_received(workers.size());

    for (auto& w : workers)
        g.create_thread(boost::ref(w));

    // let's give them something to do
    const auto num_requests = (1ul<<20);
    std::string response;
    size_t congestion = 0;

    for (size_t total_sent = 0, total_received = 0; total_sent < num_requests || total_received < num_requests;)
    {
        if (total_sent < num_requests)
        {
            // send to a random worker
            auto& to = workers[rand() % workers.size()];
            if (to.incoming->push("request " + std::to_string(total_sent)))
                ++total_sent;
            else
                congestion++;
        }

        if (total_received < num_requests)
        {
            static size_t round_robin = 0;
            auto from = (++round_robin) % workers.size();
            if (workers[from].outgoing->pop(response))
            {
                ++responses_received[from];
                ++total_received;
            }
        }
    }

    auto const sum = std::accumulate(begin(responses_received), end(responses_received), size_t());
    std::cout << "\nReceived " << sum << " responses (";
    std::copy(begin(responses_received), end(responses_received), std::ostream_iterator<size_t>(std::cout, " "));
    std::cout << ")\n";

    shutdown = true;
    g.join_all();

    std::cout << "\nTotal: " << sum << " responses/" << num_requests << " requests\n";

    std::cout << "Main thread congestion: " << std::fixed << std::setprecision(1) << (100.0*congestion/num_requests) << "%\n";

    for (size_t idx = 0; idx < workers.size(); ++idx)
        std::cout << "Worker #" << idx << " congestion: " << std::fixed << std::setprecision(1) << (100.0*workers[idx].congestion/responses_received[idx]) << "%\n";
}

[¹] "very little time" being, as ever, a relative notion that roughly means "less time than the average time between new messages". E.g. if you have 100 requests per seconds, then 5ms processing time would be "very little" for a single-threaded system. However, if you have 10k requests per second, 1 ms processing time would be about the limit on a 16-core server.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top