Spark 1.4 introduced windowing functions, which means that you can do moving average as follows adjust windowing with rowsBetween:
val schema = Seq("id", "cykle", "value")
val data = Seq(
(1, 1, 1),
(1, 2, 11),
(1, 3, 1),
(1, 4, 11),
(1, 5, 1),
(1, 6, 11),
(2, 1, 1),
(2, 2, 11),
(2, 3, 1),
(2, 4, 11),
(2, 5, 1),
(2, 6, 11)
)
val dft = sc.parallelize(data).toDF(schema: _*)
dft.select('*).show
// PARTITION BY id ORDER BY cykle ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING (5)
val w = Window.partitionBy("id").orderBy("cykle").rowsBetween(-2, 2)
val x = dft.select($"id",$"cykle",avg($"value").over(w))
x.show
Output (in zeppelin):
schema: Seq[String] = List(id, cykle, value)
data: Seq[(Int, Int, Int)] = List((1,1,1), (1,2,11), (1,3,1), (1,4,11), (1,5,1), (1,6,11), (2,1,1), (2,2,11), (2,3,1), (2,4,11), (2,5,1), (2,6,11))
dft: org.apache.spark.sql.DataFrame = [id: int, cykle: int, value: int]
+---+-----+-----+
| id|cykle|value|
+---+-----+-----+
| 1| 1| 1|
| 1| 2| 11|
| 1| 3| 1|
| 1| 4| 11|
| 1| 5| 1|
| 1| 6| 11|
| 2| 1| 1|
| 2| 2| 11|
| 2| 3| 1|
| 2| 4| 11|
| 2| 5| 1|
| 2| 6| 11|
+---+-----+-----+
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@55cd666f
x: org.apache.spark.sql.DataFrame = [id: int, cykle: int, 'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING: double]
+---+-----+-------------------------------------------------------------------------+
| id|cykle|'avg(value) WindowSpecDefinition ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING|
+---+-----+-------------------------------------------------------------------------+
| 1| 1| 4.333333333333333|
| 1| 2| 6.0|
| 1| 3| 5.0|
| 1| 4| 7.0|
| 1| 5| 6.0|
| 1| 6| 7.666666666666667|
| 2| 1| 4.333333333333333|
| 2| 2| 6.0|
| 2| 3| 5.0|
| 2| 4| 7.0|
| 2| 5| 6.0|
| 2| 6| 7.666666666666667|
+---+-----+————————————————————————————————————+