RxJava version: 0.15.1
import java.util.List;
import rx.Observable;
import rx.util.functions.Action1;
class Bar {
public static void main(String args[]) {
Integer arr[] = {1, 2, 3, 4, 5, 6}; // N = 6
Observable<Integer> oi = Observable.from(arr);
// 1.- bundle 3, skip 1
oi.buffer(3, 1)
/**
* 2.- take only the first X bundles
* When bundle 3, X = N - 2 => 4
* When bundle 4, X = N - 3 => 3
* When bundle a, X = N - (a-1)
*/
.take(4)
// 3.- calculate average
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> lst) {
int sum = 0;
for(int i = 0; i < lst.size(); i++) {
sum += lst.get(i);
}
System.out.println("MA(3) " + lst +
" => " + sum / lst.size());
}
});
}
}
Sample output:
MA(3) [1, 2, 3] => 2
MA(3) [2, 3, 4] => 3
MA(3) [3, 4, 5] => 4
MA(3) [4, 5, 6] => 5