Domanda

I'm working on a java SE (+netty) based system that receives messages of different types from clients, aggregates them and pushes aggregated results into storage.

I need to pre-accumulate messages before aggregation until one of two conditions is met - timeout exceeded or quantity exceeded. Timeouts and quantities are pre-configured for each type and may differ greatly. After that, I aggregate/reduce messages of same type and sender and push result into storage. Aggregation may look like calculating average value among messages. Or it may be much more complex. Post-aggregation in storage is not acceptable in my case.

The task seems easy, but I'm stuck with implementation. Obviously I need to collect messages in some data structure and check timeout and quantity rules on each element. I thought about DelayedQueue<Delayed<List<MyMessages>>> (List<MyMessages> - is an aggregatable list of messages).

DelayedQueue implements timeouts in a great way. But it's not clear, how to check maximum quantities and add new messages in Lists effectively. I don't want to check all Lists on every new message, searching for the right one. And it looks not thread safe to add data to Delayed<List> elements.

What data structures/architecture is suitable for the system I'm trying to create? I guess such problem has a proper academic name and solution, what should I google?

È stato utile?

Soluzione

Ignoring existing data structures that might help here, the problem can fundamentally be solved in two ways: Either the thread(s) accepting messages performs the checks and notifies the aggregation thread, or the aggregation thread needs to poll. The first approach makes the limit check easy, the second approach makes the timeout easy.

I would suggest combining both: Recieving threads keep track of how many items have been accumulated and notifies the aggregating thread if the threshold has been reached, and the aggregating thread keeps track of the time.

You can do this, simplistically, something like this:

final long maxWait = 1000;
final int maxMessages = 10;
final ArrayBlockingQueue<Message> queue;

final Thread aggregator = new Thread()
{
  @Override
  public void run() {
    try {
      ArrayList<Message> messages = new ArrayList<>();
      while ( true ) {
        messages.clear();
        queue.drainTo( messages );

        // Store messages

        this.wait( maxWait );
      }
    }
    catch ( InterruptedException e ) {
      // Handle this..
    }
  }
};

final Thread reciever = new Thread()
{
  @Override
  public void run() {
    Message message; // Get this from network
    queue.put( message );
    if(queue.size() > maxMessages) {
      aggregator.notify();
    }
  }
}

This does not handle your message grouping, but I'm sure you can see how this can be extrapolated to handle multiple queues of different message types. To make the aggregator only consider some specific message type when it's notified, you could use some more elaborate messaging mechanism instead of the wait/notify, for instance have it wait on a queue instead, where receiving threads in turn can put queues as "messages" about queues that need to be aggregated and stored.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top