So unlike Daniel's answer my does not tag the data as it is produced. It just takes at least element from upstream and then continues to aggregate more values in the monoid until the time interval has passed.
This codes uses a list to aggregate, but there are better monoids to aggregate with
import Pipes
import qualified Pipes.Prelude as P
import Data.Time.Clock
import Data.Time.Calendar
import Data.Time.Format
import Data.Monoid
import Control.Monad
-- taken from pipes-rt
doubleToNomDiffTime :: Double -> NominalDiffTime
doubleToNomDiffTime x =
let d0 = ModifiedJulianDay 0
t0 = UTCTime d0 (picosecondsToDiffTime 0)
t1 = UTCTime d0 (picosecondsToDiffTime $ floor (x/1e-12))
in diffUTCTime t1 t0
-- Adapted from from pipes-parse-1.0
wrap
:: Monad m =>
Producer a m r -> Producer (Maybe a) m r
wrap p = do
p >-> P.map Just
forever $ yield Nothing
yieldAggregateOverTime
:: (Monoid y, -- monoid dependance so we can do aggregation
MonadIO m -- to beable to get the current time the
-- base monad must have access to IO
) =>
(t -> y) -- Change element from upstream to monoid
-> Double -- Time in seconds to aggregate over
-> Pipe (Maybe t) y m ()
yieldAggregateOverTime wrap period = do
t0 <- liftIO getCurrentTime
loop mempty (dtUTC `addUTCTime` t0)
where
dtUTC = doubleToNomDiffTime period
loop m ts = do
t <- liftIO getCurrentTime
v0 <- await -- await at least one element
case v0 of
Nothing -> yield m
Just v -> do
if t > ts
then do
yield (m <> wrap v)
loop mempty (dtUTC `addUTCTime` ts)
else do
loop (m <> wrap v) ts
main = do
runEffect $ wrap (each [1..]) >-> yieldAggregateOverTime (\x -> [x]) (0.0001)
>-> P.take 10 >-> P.print
Depending on cpu load you the output data will be aggregated differently. With at least on element in each chunk.
$ ghc Main.hs -O2
$ ./Main
[1,2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
[10]
[11]
$ ./Main
[1,2]
[3]
[4]
[5]
[6,7,8,9,10]
[11,12,13,14,15,16,17,18]
[19,20,21,22,23,24,25,26]
[27,28,29,30,31,32,33,34]
[35,36,37,38,39,40,41,42]
[43,44,45,46,47,48,49,50]
$ ./Main
[1,2,3,4,5,6]
[7]
[8]
[9,10,11,12,13,14,15,16,17,18,19,20]
[21,22,23,24,25,26,27,28,29,30,31,32,33]
[34,35,36,37,38,39,40,41,42,43,44]
[45,46,47,48,49,50,51,52,53,54,55]
[56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72]
[73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88]
[89,90,91,92,93,94,95,96,97,98,99,100,101,102,103]
$ ./Main
[1,2,3,4,5,6,7]
[8]
[9]
[10,11,12,13,14,15,16,17,18]
[19,20,21,22,23,24,25,26,27]
[28,29,30,31,32,33,34,35,36,37]
[38,39,40,41,42,43,44,45,46]
[47,48,49,50]
[51,52,53,54,55,56,57]
[58,59,60,61,62,63,64,65,66]
You might want to look at the source code of
pipes-rt it shows one approach to deal with time in pipes.
edit: Thanks to Daniel Díaz Carrete, adapted pipes-parse-1.0 technique to handle upstream termination. A pipes-group solution should be possible using the same technique as well.