Question

I have a few million datapoints, each with a time and a value. I'm interested in knowing all of the sliding windows, (ie, chunks of 4000 datapoints) where the range from high to low of the window exceeds a constant threshold.

For example:, assume a window of length 3, and a threshold where high - low > 3. Then the series: [10 12 14 13 10 11 16 14 17] would result in [0, 2, 4, 5] because those are the indexes where the 3 period window's high - low range exceeded the threshold.

I have a window size of 4000 and a dataset size of millions.

The naive approach is to just calculate every possible window range, ie 1-4000, 2-4001, 3-4002, etc, and accumulate those sets that breached the threshold. This takes forever as you might imagine for large datasets.

So, the algorithm I think would be better is the following:

Calculate the range of the first window (1-4000), and store the index of the high/low of the window range. Then, iterate to (2-4001, 3-4002) etc. Only update the high/low index if the NEW value on the far right of the window is higher/lower than the old cached value.

Now, let's say the high/low indexes of the 1-4000 window is 333 and 666 respectively. I iterate and continue updating new highs/lows as I see them on the right, but as soon as the window is at 334-4333 (as soon as the cached high/low is outside of the current window) I recalculate the high/low for the current window (334-4333), cache, and continue iterating.

My question is:

1.) Is there a mathematical formula for this that eliminates the need for an algorithm at all? I know there are formulas for weighted and exponential moving averages over a window period that don't require recalculation of the window.

2.) Is my algorithm sensible? Accurate? Is there a way it could be greatly simplified or improved?

Thanks a lot.

Was it helpful?

Solution

If the data length is n and window size m, then here's an O(n log m) solution using sorted-maps.

(defn freqs 
  "Like frequencies but uses a sorted map"
  [coll]
  (reduce (fn [counts x] 
            (assoc counts x (inc (get counts x 0)))) 
          (sorted-map) coll))

(defn rng
  "Return max - min value of a sorted-map (log time)"
  [smap]
  (- (ffirst (rseq smap)) (ffirst smap)))

(defn slide-threshold [v w t] 
  (loop [q (freqs (subvec v 0 w)), i 0, j (+ i w), a []] 
    (if (= (count v) j) 
      a 
      (let [q* (merge-with + q {(v i) -1} {(v j) 1}) 
            q* (if (zero? (q* (v i))) (dissoc q* (v i)) q*) 
            a* (if (> (rng q) t) (conj a i) a)] 
        (recur q* (inc i) (inc j) a*)))))

(slide-threshold [10 12 14 13 10 11 16 14 17] 3 3)
;=> [0 2 4 5]

OTHER TIPS

The naive version is not linear. Linear would be O(n). The naive algorithm is O(n*k), where k is the window size. Your improvement also is O(n * k) in the worst case (imagine a sorted array), but in the general case you should see a big improvement in running time because you'll avoid a large number of recalculations.

You can solve this in O(n log k) by using a Min-max heap (or two heaps), but you have to use a type of heap that can remove an arbitrary node in O(log k). You can't use a standard binary heap because although removing an arbitrary node is O(log k), finding the node is O(k).

Assuming you have a Min-max heap, the algorithm looks like this:

heap = create empty heap
add first k items to the heap
for (i = k; i < n-k; ++i)
{
    if (heap.MaxItem - heap.MinItem) > threshold
        output range
    remove item i-k from the heap
    add item i to the heap
}

The problem, of course, is removing item i-k from the heap. Actually, the problem is finding it efficiently. The way I've done this in the past is to modify my binary heap so that it stores nodes that contain an index and a value. The heap comparisons use the value, of course. The index is the node's position in the backing array, and is updated by the heap whenever the node is moved. When an item is added to the heap, the Add method returns a reference to the node, which I maintain in an array. Or in your case you can maintain it in a queue.

So the algorithm looks like this:

queue = create empty queue of heap nodes
heap = create empty heap
for (i = 0; i < k; ++i)
{
    node = heap.Add(array[i]);
    queue.Add(node);
}
for (i = k; i < n-k; ++i)
{
    if (heap.MaxItem - heap.MinItem) > threshold
        output range
    node = queue.Dequeue()
    remove item at position node.Index from the heap
    node = heap.Add(array[i])
    queue.Add(node)
}

This is provably O(n log k). Every item is read and added to the heap. Actually, it's also removed from the heap. In addition, every item is added to the queue and removed from the queue, but those two operations are O(1).

For those of you who doubt me, it is possible to remove an arbitrary element from a heap in O(log k) time, provided that you know where it is. I explained the technique here: https://stackoverflow.com/a/8706363/56778.

So, if you have a window of size 4,000, running time will be roughly proportional to: 3n * 2(log k). Given a million items and a window size of 5,000, that works out to 3,000,000 * (12.3 * 2), or about 75 million. That's roughly equivalent to having to recompute the full window in your optimized naive method 200 times.

As I said, your optimized method can end up taking a long time if the array is sorted, or nearly so. The heap algorithm I outlined above doesn't suffer from that.

You should give your "better" algorithm a try and see if it's fast enough. If it is, and you don't expect pathological data, then great. Otherwise take a look at this technique.

There are some algoritms to keep minimum (or maximum) value in sliding window with amortized complexity O(1) per element (O(N) for all data set). This is one of them using Deque data structure, which contains value/index pairs. For both Min and Max you have to keep two deques (with max length 4000).

 at every step:
  if (!Deque.Empty) and (Deque.Head.Index <= CurrentIndex - T) then 
     Deque.ExtractHead;
  //Head is too old, it is leaving the window

  while (!Deque.Empty) and (Deque.Tail.Value > CurrentValue) do
     Deque.ExtractTail;
  //remove elements that have no chance to become minimum in the window

  Deque.AddTail(CurrentValue, CurrentIndex); 
  CurrentMin = Deque.Head.Value
  //Head value is minimum in the current window

Another approach uses stacks

Here is the python code for this:

import heapq

l = [10,12, 14, 13, 10, 11, 16, 14, 17]
w = 3
threshold = 3
breached_indexes = []


#set up the heap for the initial window size
min_values = [(l[i], i) for i in range(0,w)]
max_values = [(-l[i], i) for i in range(0,w)]
heapq.heapify(min_values)
heapq.heapify(max_values)

#check if first window violates the add the index
if (threshold <= -max_values[0][0] - min_values[0][0]):
        breached_indexes.append(0)

for i in range(1, len(l)-w+1):
    #remove all elements before the current index
    while min_values[0][1] < i:
        heapq.heappop(min_values)

    while max_values[0][1] < i:
        heapq.heappop(max_values)

    #check the breach
    if (threshold <= -max_values[0][0] - min_values[0][0]):
        breached_indexes.append(i)

    if (i+w >= len(l)):
        break

    #push the next element entering the window
    heapq.heappush(min_values, (l[i+w], i+w))
    heapq.heappush(max_values, (-l[i+w], i+w))

print breached_indexes

Explanation:

  1. Maintain 2 heaps, min-heap and max-heap
  2. At every step when we move the window, do the following

    a. Remove items from the heap till the index of the items don't fall outside the window
    b. Check if threshold is violated comparing the top elements of the heap and record the index, if needed.
    c. push the element that newly entered the window into both the heaps.

*I use a negative value for max_heap, since python's implementation is a min-heap

The worst-case complexity of this algorithm would be O(n log n).

Just wanted to play with an idea inspired by the Simple Moving Average concept.

Let's consider 9 points with a sliding window of size 4. At any point, we'll keep track of the maximum values for all windows of size 4, 3, 2, and 1 respectively that end at that point. Suppose we store them in arrays...

  • At position 1 (p1), we have one value (v1) and one window {p1}, the array A1 contains max(v1)
  • At position 2 (p2), we have two values (v1, v2) and two windows {p1, p2} and {p2}, the array A2 contains max(v1, v2) and max(v2)
  • At position 3 (p3), following the same pattern, the array A3 contains max(v1, v2, v3) = max(max(v1, v2), v3), max(v2, v3), and max(v3). Observe that we already know max(v1, v2) from A2
  • Let's jump a bit and look at position 6 (p6), the array A6 contains max(v3, v4, v5, v6), max(v4, v5, v6), max(v5, v6), and max(v6). Again, we already know max(v3, v4, v5), max(v4, v5), and max(v5) from A5.

Roughly, it looks something like this:

    1  2  3  4  5  6  7  8  9

    1  1  1  1
    x  2  2  2  2
    x  x  3  3  3  3
    x  x  x  4  4  4  4
                5  5  5  5
                   6  6  6  6
                      7  7  7
                         8  8
                            9

This can be generalized as follows:

Let 
n   number of datapoints
s   window size, 1 <= s <= n
i   current position / datapoint, 1 <= s <= n
Vi  value at position i
Ai  array at position i (note: the array starts at 1 in this definition)

then
Ai (i <= s) has elements 
aj = max(Vi, Ai-1[j]) for j in (1..i-1)
aj = Vi for j = i
aj = undefined/unimportant for j in (i+1..s)  

Ai (i > s) has elements 
aj = max(Vi, Ai-1[j+1]) for j in (1..s-1) 
aj = Vi for j = s

The max value for the window of size s at position i is given by Ai[1]. Further, one gets as a bonus the max value for a window of any size x (0 < x <= s ) given by Ai[s - x + 1].

In my opinion the following is true:

  • Computational/time complexity is minimal. There is no sorting, insertion, deletion, or searching; however, the max function is called n*s times.
  • Space complexity is bigger (we are storing at least s arrays of size s) but only if we want to persist the result for future queries which run in O(1). Otherwise, only two arrays are necessary, Ai-1 and Ai; all we need in order to fill in the array at position i is the array at position i-1
  • We still cannot easily make this algorithm run in parallel processes
  • Using this algorithm to calculate min and max values, we can efficiently accumulate sliding window percentage changes of large dataset

I added a sample implementation / test bed in Javascript for it on github - SlidingWindowAlgorithm. Here is a copy of the algorithm itself (Please note that in this implementation the array is indexed at 0):

var evalMaxInSlidingWindow = function(datapoints, windowsize){
    var Aprev = [];
    var Acurr = [];
    var Aresult = [];

    for (var i = 0, len = datapoints.length; i < len; i++)
    {
        if (i < windowsize)
        {
            for(var j = 0; j < windowsize; j++)
            {
                if (j < i)
                {
                    Acurr[j] = Math.max(datapoints[i], Aprev[j]);
                }
                if (j == i)
                {
                    Acurr[j] = datapoints[i];
                }
            }
        } 
        else 
        {
            for(var j = 0; j < windowsize; j++)
            {
                if (j < windowsize - 1)
                {
                    Acurr[j] = Math.max(datapoints[i], Aprev[j + 1]);
                }
                if (j == windowsize - 1)
                {
                    Acurr[j] = datapoints[i];
                }
            }
        }

        Aresult.push(Acurr[0]);
        Aprev = [].concat(Acurr);
    }

    return Aresult;
};

After a discussion with Scott, it seems that this algorithm does nothing special. Well, it was fun playing with it. : )

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top