Question

In a high volume multi-threaded java project I need to implement a non-blocking buffer.

In my scenario I have a web layer that receives ~20,000 requests per second. I need to accumulate some of those requests in some data structure (aka the desired buffer) and when it is full (let's assume it is full when it contains 1000 objects) those objects should be serialized to a file that will be sent to another server for further processing.

The implementation shoud be a non-blocking one. I examined ConcurrentLinkedQueue but I'm not sure it can fit the job.

I think I need to use 2 queues in a way that once the first gets filled it is replaced by a new one, and the full queue ("the first") gets delivered for further processing. This is the basic idea I'm thinking of at the moment, and still I don't know if it is feasible since I'm not sure I can switch pointers in java (in order to switch the full queue).

Any advice?

Thanks

Was it helpful?

Solution

What I usualy do with requirements like this is create a pool of buffers at app startup and store the references in a BlockingQueue. The producer thread pops buffers, fills them and then pushes the refs to another queue upon which the consumers are waiting. When consumer/s are done, (data written to fine, in your case), the refs get pushed back onto the pool queue for re-use. This provides lots of buffer storage, no need for expensive bulk copying inside locks, eliminates GC actions, provides flow-control, (if the pool empties, the producer is forced to wait until some buffers are returned), and prevents memory-runaway, all in one design.

More: I've used such designs for many years in various other languages too, (C++, Delphi), and it works well. I have an 'ObjectPool' class that contains the BlockingQueue and a 'PooledObject' class to derive the buffers from. PooledObject has an internal private reference to its pool, (it gets initialized on pool creation), so allowing a parameterless release() method. This means that, in complex designs with more than one pool, a buffer always gets released to the correct pool, reducing cockup-potential.

Most of my apps have a GUI, so I usually dump the pool level to a status bar on a timer, every second, say. I can then see roughly how much loading there is, if any buffers are leaking, (number consistently goes down and then app eventually deadlocks on empty pool), or I am double-releasing, (number consistently goes up and app eventually crashes).

It's also fairly easy to change the number of buffers at runtime, by either creating more and pushing them into the pool, or by waiting on the pool, removing buffers and letting GC destroy them.

OTHER TIPS

I think you have a very good point with your solution. You would need two queues, the processingQueue would be the buffer size you want (in your example that would be 1000) while the waitingQueue would be a lot bigger. Every time the processingQueue is full it will put its contents in the specified file and then grab the first 1000 from the waitingQueue (or less if the waiting queue has fewer than 1000).

My only concern about this is that you mention 20000 per second and a buffer of 1000. I know the 1000 was an example, but if you don't make it bigger it might just be that you are moving the problem to the waitingQueue rather than solving it, as your waitingQueue will receive 1000 new ones faster than the processingQueue can process them, giving you a buffer overflow in the waitingQueue.

Instead of putting each request object in a queue, allocate an array of size 1000, and when it is filled, put that array in the queue to the sender thread which serializes and sends the whole array. Then allocate another array.

How are you going to handle the situation when the sender cannot work fast enough and its queue is overflown? To avoid out of memory error, use queue of a limited size.

I might be getting something wrong, but you may use an ArrayList for this as you don't need to poll per element from your queue. You just flush (create a copy and clear) your array in a synchronized section when it's size reaches the limit and you need to send it. Adding to this list should also be synced to this flush operation.

Swapping your arrays might not be safe - if your sending is slower than your generation, buffers may soon start overwriting each other. And 20000-elements array allocation per second is almost nothing for GC.

Object lock  = new Object();

List list = ...;

synchronized(lock){
    list.add();
}

...

// this check outside is a quick dirty check for performance, 
// it's not valid out of the sync block
// this first check is less than nano-second and will filter out 99.9%
// `synchronized(lock)` sections
if(list.size() > 1000){
  synchronized(lock){  // this should be less than a microsecond
     if(list.size() > 1000){  // this one is valid
       // make sure this is async (i.e. saved in a separate thread) or <1ms
       // new array allocation must be the slowest part here
       sendAsyncInASeparateThread(new ArrayList(list)); 
       list.clear();
     }
  }
}

UPDATE

Considering that sending is async, the slowest part here is new ArrayList(list) which should be around 1 microseconds for 1000 elements and 20 microseconds per second. I didn't measure that, I resolved this from proportion in which 1 million elements are allocated in ~1 ms.

If you still require a super-fast synchronized queue, you might want to have a look at the MentaQueue

What do you mean by "switch pointers"? There are no pointers in Java (unless you're talking about references).

Anyways, as you probably saw from the Javadoc, ConcurrentLinkedQueue has a "problem" with the size() method. Still, you could use your original idea of 2 (or more) buffers that would get switched. There's probably going to be some bottlenecks with the disk I/O. Maybe the non-constant time of size() won't be a problem here either.

Of course if you want it to be non-blocking, you better have a lot of memory and a fast disk (and large / bigger buffers).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top