Pregunta

I need to measure that rate at which a software system is consuming messages from a message queue and report on that periodically.

Specifically, messages arrive from a message queueing system and I need to report (each second) on the number of messages received within a number of rolling windows - e.g. the last second, the last 5 seconds, the last 30 seconds, etc.

Whilst I'm sure I could build this, I'm not certain that I'd go about it in the most efficient manner! I'm also sure that there are libraries for doing this (I'm using the JVM, so Apache Commons Math springs to mind), but I don't even know the right words to Google for! :-)

¿Fue útil?

Solución 2

This is what I ended up writing.

package com.example;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BucketCounter {
    private final Lock rollLock = new ReentrantLock();
    private final int[] bucketSizes;
    private final int[] buckets;
    private final int[] intervals;
    private final AtomicInteger incoming = new AtomicInteger(0);

    public BucketCounter(int... bucketSizes) {
        if (bucketSizes.length < 1) {
            throw new IllegalArgumentException("Must specify at least one bucket size");
        }

        this.bucketSizes = bucketSizes;
        this.buckets = new int[bucketSizes.length];

        Arrays.sort(bucketSizes);

        if (bucketSizes[0] < 1) {
            throw new IllegalArgumentException("Cannot have a bucket of size < 1");
        }

        intervals = new int[bucketSizes[bucketSizes.length - 1]];
    }

    public int count(int n) {
        return incoming.addAndGet(n);
    }

    public int[] roll() {
        final int toAdd = incoming.getAndSet(0);

        rollLock.lock();
        try {
            final int[] results = new int[buckets.length];

            for (int i = 0, n = buckets.length; i < n; i++) {
                results[i] = buckets[i] = buckets[i] - intervals[bucketSizes[i] - 1] + toAdd;
            }

            System.arraycopy(intervals, 0, intervals, 1, intervals.length - 1);
            intervals[0] = toAdd;

            return results;
        } finally {
            rollLock.unlock();
        }
    }
}

Initialise it by passing the different time increments (e.g. 1, 5, 30). Then arrange for a background thread to call roll() every "time period". If you call it every second, then your buckets are 1, 5 and 30 seconds. If you call it every 5 seconds, then your buckets are 5, 25 and 150 seconds, etc. Basically, the buckets are expressed in "number of times roll() is called").

roll() also returns you an array of the current counts for each bucket. Note that these numbers are the raw counts, and are not averaged per time interval. You'll need to do that division yourself if you want to measure "rates" rather than "counts".

Finally, every time an event happens, call count(). I've set up a system with a few of these and I call count(1) on each message to count incoming messages, count(message.size()) on each message to count incoming byte rates, etc.

Hope that helps.

Otros consejos

Here is my solution based on exponential smoothing. It doesn't require any background threads. You would create 1 instance for each rolling window that you want to track. For each relevant event you would call newEvent on each instance.

public class WindowedEventRate {

  private double normalizedRate; // event rate / window
  private long windowSizeTicks;
  private long lastEventTicks;


  public WindowedEventRate(int aWindowSizeSeconds) {
    windowSizeTicks = aWindowSizeSeconds * 1000L;
    lastEventTicks = System.currentTimeMillis();
  }

  public double newEvent() {

    long currentTicks = System.currentTimeMillis();
    long period = currentTicks - lastEventTicks;
    lastEventTicks = currentTicks;
    double normalizedFrequency = (double) windowSizeTicks / (double) period;

    double alpha = Math.min(1.0 / normalizedFrequency, 1.0);
    normalizedRate = (alpha * normalizedFrequency) + ((1.0 - alpha) * normalizedRate);
    return getRate();
  }

  public double getRate() {
    return normalizedRate * 1000L / windowSizeTicks;
  }
}

You could probably implement it as an interceptor, so search for interceptor combined with the message queue product name and the language name.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top