Question

I have been building a high-throughput server application for multimedia messaging, language of implementation is C++. Each server can be used in stand-alone mode or many servers can be joined together to create a DHT-based overlay network; the servers act like super-peers like in case of Skype.

The work is in progress. Currently the server can handle around 200,000 messages per second (256 byte messages) and has a max throughput of around 256 MB/s on my machine (Intel i3 Mobile 2 GHz, Fedora Core 18 (64-bit), 4 GB RAM) for messages of length 4096 bytes. The server has got two threads, one thread for handling all IOs (epoll-based, edge triggered) another one for processing the incoming messages. There is another thread for overlay management, but it doesn't matter in the current discussion.

The two threads in discussion share data using two circular buffers. Thread number 1 enqueues fresh messages for the thread number 2 using one circular buffer, while thread number 2 returns back the processed messages through the other circular Buffer. The server is completely lock-free. I am not using any synchronization primitive what-so-ever, not even atomic operations.

The circular buffers never overflow, because the messages are pooled (pre-allocated on start). In fact all vital/frequently-used data-structures are pooled to reduce memory fragmentation and to increase cache efficiency, hence we know the maximum number of messages we are ever going to create when the server starts, hence we can pre-calculate the maximum size of the buffers and then initialize the circular buffers accordingly.

Now my question: Thread #1 enqueues the serialized messages one message at a time (actually the pointers to message objects), while thread #2 takes out messages from the queue in chunks (chunks of 32/64/128), and returns back the processed messages in chunks through the second circular buffer. In case there are no new messages thread #2 keeps busy waiting, hence keeping one of the CPU cores busy all the time. How can I improve upon the design further? What are the alternatives to the busy wait strategy? I want to do this elegantly and efficiently. I have considered using semaphores, but I fear that is not the best solution for a simple reason that I have to call "sem_post" every time I enqueue a message in the thread #1 which might considerably decrease the throughput, the second thread has to call "sem_post" equal number of times to keep the semaphore from overflowing. Also I fear that a semaphore implementation might be using a mutex internally.

The second good option might be use of signal if I can discover an algorithm for raising signal only if the second thread has either "emptied the queue and is in process of calling sigwait" or is "already waiting on sigwait", in short the signal must be raised minimum number of times, although it won't hurt if signals are raised a few more times than needed. Yes, I did use Google Search, but none of the solutions I found on Internet were satisfactory. Here are a few considerations:

A. The server must waste minimum CPU cycles while making system calls, and system calls must be used a minimum number of times.

B. There must be very low overhead and the algorithm must be efficient.

C. No locking what-so-ever.

I want all options to be put on table.

Here is the link to the site where I have shared info about my server, to better understand the functionality and the purpose: www.wanhive.com

Was it helpful?

Solution

Busy waiting is good if you need to wake up thread #2 as fast as possible. In fact this is the fastest way to notify one processor about changes made by another processor. You need to generate memory fences on both ends (write fence on one side and read fence on the other). But this statement holds true only if your both threads are executed on dedicated processors. In this case no context switching is needed, just cache coherency traffic.

There is some improvements can be made.

  1. If thread #2 is in general CPU bound and do busy waiting - it can be penalized by the scheduler (at least on windows and linux). OS scheduler dynamically adjust thread priorities to improve overall system performance. It reduces priorities of CPU bound threads that consumes large amount of CPU time to prevent thread starvation. You need to manually increase priority of thread #2 to prevent this.
  2. If you have multicore or multiprocessor machine, you will end up with undersubscription of processors and your application won't be able to exploit hardware concurrency. You can mitigate this by using several processor threads (thread #2).

Parallelization of processing step. There is two options.

  1. Your messages is totally ordered and need to be processed in the same order as they arrived.
  2. Messages can be reordered. Processing can be done in any order.

You need N cycle buffers and N processing threads and N output buffers and one consumer thread in first case. Thread #1 enqueues messages in round-robin order in that cycle buffers.

// Thread #1 pseudocode
auto message = recv()
auto buffer_index = atomic_increment(&message_counter);
buffer_index = buffer_index % N;  // N is the number of threads
// buffers is an array of cyclic buffers - Buffer* buffers[N];
Buffer* current_buffer = buffers[buffer_index];
current_buffer->euqueue(message);

Each thread consumes messages from one of the buffers and enqueues result to his dedicated output buffer.

// Thread #i pseudocode
auto message = my_buffer->dequeue();
auto result = process(message);
my_output_buffer->enqueue(result);

Now you need to process all this messages in the arrival order. You can do this with dedicated consumer thread by dequeuing processed messages from output cyclic buffers in round-robin manner.

// Consumer thread pseudocode
// out_message_counter is equal to message_counter at start
auto out_buffer_index = atomic_increment(&out_message_counter);
out_buffer_index = out_buffer_index % N;
// out_buffers is array of output buffers that is used by processing
// threads
auto out_buffer = out_buffers[out_buffer_index];
auto result = out_buffer->dequeue();
send(result);  // or whatever you need to do with result

In second case, when you doesn't need to preserve message order - you doesn't need the consumer thread and output cyclic buffers. You just do whatever you need to do with result in processing thread.

N must be equal num CPU's - 3 in first case ("- 3" is one I/O thread + one consumer thread + one DHT thread) and num CPU's - 2 in second case ("- 2" is one I/O thread + one DHT thread). This is because busy wait can't be effective if you have oversubscription of processors.

OTHER TIPS

Sounds like you want to coordinate a producer and consumer connected by some shared state. At least in Java for such patterns, one way to avoid busy wait is to use wait and notify. With this approach, thread #2 can go into a blocked state if it finds that the queue is empty by calling wait and avoid spinning the CPU. Once thread #1 puts some stuff in the queue, it can do a notify. A quick search of such mechanisms in C++ yields this:

wait and notify in C/C++ shared memory

You can have thread #2 go to sleep for X miliseconds when the queue is empty.

X can be determined by the length of the queues you want + some guard band.

BTW, in user mode (ring3) you can't use MONITOR/MWAIT instructions which would be ideal for your question.

Edit

You should definitely give TBB's RWlock a try (there's a free version). Sounds like what you're looking for.

Edit2

Another option is to use conditional variables. They involve a mutex and a condition. Basically you wait on the condition to become "true". The low level pthread stuff can be found here.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top