Ignoring existing data structures that might help here, the problem can fundamentally be solved in two ways: Either the thread(s) accepting messages performs the checks and notifies the aggregation thread, or the aggregation thread needs to poll. The first approach makes the limit check easy, the second approach makes the timeout easy.
I would suggest combining both: Recieving threads keep track of how many items have been accumulated and notifies the aggregating thread if the threshold has been reached, and the aggregating thread keeps track of the time.
You can do this, simplistically, something like this:
final long maxWait = 1000;
final int maxMessages = 10;
final ArrayBlockingQueue<Message> queue;
final Thread aggregator = new Thread()
{
@Override
public void run() {
try {
ArrayList<Message> messages = new ArrayList<>();
while ( true ) {
messages.clear();
queue.drainTo( messages );
// Store messages
this.wait( maxWait );
}
}
catch ( InterruptedException e ) {
// Handle this..
}
}
};
final Thread reciever = new Thread()
{
@Override
public void run() {
Message message; // Get this from network
queue.put( message );
if(queue.size() > maxMessages) {
aggregator.notify();
}
}
}
This does not handle your message grouping, but I'm sure you can see how this can be extrapolated to handle multiple queues of different message types. To make the aggregator only consider some specific message type when it's notified, you could use some more elaborate messaging mechanism instead of the wait/notify, for instance have it wait on a queue instead, where receiving threads in turn can put queues as "messages" about queues that need to be aggregated and stored.