Question

Is it possible to create pipes that get all values that have been sent downstream in a certain time period? I'm implementing a server where the protocol allows me to concatenate outgoing packets and compress them together, so I'd like to effectively "empty out" the queue of downstream ByteStrings every 100ms and mappend them together to then yield on to the next pipe which does the compression.

Was it helpful?

Solution

Here's a solution using pipes-concurrency. You give it any Input and it will periodically drain the input of all values:

import Control.Applicative ((<|>))
import Control.Concurrent (threadDelay)
import Data.Foldable (forM_)
import Pipes
import Pipes.Concurrent

drainAll :: Input a -> STM (Maybe [a])
drainAll i = do
    ma <- recv i
    case ma of
        Nothing -> return Nothing
        Just a  -> loop (a:)
  where
    loop diffAs = do
        ma <- recv i <|> return Nothing
        case ma of
            Nothing -> return (Just (diffAs []))
            Just a  -> loop (diffAs . (a:))

bucketsEvery :: Int -> Input a -> Producer [a] IO ()
bucketsEvery microseconds i = loop
  where
    loop = do
        lift $ threadDelay microseconds
        ma <- lift $ atomically $ drainAll i
        forM_ ma $ \a -> do
            yield a
            loop

This gives you much greater control over how you consume elements from upstream, by selecting the type of Buffer you use to build the Input.

If you're new to pipes-concurrency, you can read the tutorial which explains how to use spawn, Buffer and Input.

OTHER TIPS

Here is a possible solution. It is based on a Pipe that tags ByteStrings going downstream with a Bool, in order to identify ByteStrings belonging to the same "time bucket".

First, some imports:

import Data.AdditiveGroup
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Builder as BB
import Data.Thyme.Clock
import Data.Thyme.Clock.POSIX
import Control.Monad.State.Strict
import Control.Lens (view)
import Control.Concurrent (threadDelay)
import Pipes
import Pipes.Lift
import qualified Pipes.Prelude as P
import qualified Pipes.Group as PG

Here is the tagging Pipe. It uses StateT internally:

tagger :: Pipe B.ByteString (B.ByteString,Bool) IO ()
tagger = do
    startTime <- liftIO getPOSIXTime
    evalStateP (startTime,False) $ forever $ do
        b <- await
        currentTime <- liftIO getPOSIXTime
        -- (POSIXTime,Bool) inner state
        (baseTime,tag) <- get
        if (currentTime ^-^ baseTime > timeLimit)
            then let tag' = not tag in
                 yield (b,tag') >> put (currentTime, tag')
            else yield $ (b,tag)
    where
        timeLimit = fromSeconds 0.1

Then we can use functions from the pipes-group package to group ByteStrings belonging to the same "time bucket" into lazy ByteStrings:

batch :: Producer B.ByteString IO () -> Producer BL.ByteString IO ()
batch producer =  PG.folds (<>) mempty BB.toLazyByteString
                . PG.maps (flip for $ yield . BB.byteString . fst)
                . view (PG.groupsBy $ \t1 t2-> snd t1 == snd t2)
                $ producer >-> tagger

It seems to batch correctly. This program:

main :: IO ()
main = do
    count <- P.length $ batch (yield "boo" >> yield "baa")
    putStrLn $ show count
    count <- P.length $ batch (yield "boo" >> yield "baa" 
                               >> liftIO (threadDelay 200000) >> yield "ddd")
    putStrLn $ show count

Has the output:

1
2

Notice that the contents of a "time bucket" are only yielded when the first element of the next bucket arrives. They are not yielded automatically each 100ms. This may or may not be a problem for you. It you want to yield automatically each 100ms, you would need a different solution, possibly based on pipes-concurrency.

Also, you could consider working directly with the FreeT-based "effectul lists" provided by pipes-group. That way you could start compressing the data in a "time bucket" before the bucket is full.

So unlike Daniel's answer my does not tag the data as it is produced. It just takes at least element from upstream and then continues to aggregate more values in the monoid until the time interval has passed.

This codes uses a list to aggregate, but there are better monoids to aggregate with

import Pipes
import qualified Pipes.Prelude as P

import Data.Time.Clock
import Data.Time.Calendar

import Data.Time.Format

import Data.Monoid

import Control.Monad

-- taken from pipes-rt
doubleToNomDiffTime :: Double -> NominalDiffTime
doubleToNomDiffTime x =
  let d0 = ModifiedJulianDay 0
      t0 = UTCTime d0 (picosecondsToDiffTime 0)
      t1 = UTCTime d0 (picosecondsToDiffTime $ floor (x/1e-12))
  in  diffUTCTime t1 t0

-- Adapted from from pipes-parse-1.0 
wrap
  :: Monad m =>
     Producer a m r -> Producer (Maybe a) m r
wrap p = do
  p >-> P.map Just
  forever $ yield Nothing
yieldAggregateOverTime
  :: (Monoid y,  -- monoid dependance so we can do aggregation
      MonadIO m  -- to beable to get the current time the
                 -- base monad must have access to IO
     ) =>
     (t -> y) -- Change element from upstream to monoid
  -> Double -- Time in seconds to aggregate over
  -> Pipe (Maybe t) y m ()
yieldAggregateOverTime wrap period = do
  t0 <- liftIO getCurrentTime
  loop mempty (dtUTC `addUTCTime` t0)
  where
    dtUTC = doubleToNomDiffTime period
    loop m ts = do
      t <- liftIO getCurrentTime
      v0 <- await -- await at least one element
      case v0 of
        Nothing -> yield m
        Just v -> do
          if t > ts
          then do
            yield (m <> wrap v)
            loop mempty (dtUTC `addUTCTime` ts)
          else do
            loop (m <> wrap v) ts


main = do
  runEffect $  wrap (each [1..]) >-> yieldAggregateOverTime (\x -> [x]) (0.0001)
                            >-> P.take 10 >-> P.print

Depending on cpu load you the output data will be aggregated differently. With at least on element in each chunk.

$ ghc Main.hs -O2
$ ./Main
[1,2]
[3]
[4]
[5]
[6]
[7]
[8]
[9]
[10]
[11]

$ ./Main
[1,2]
[3]
[4]
[5]
[6,7,8,9,10]
[11,12,13,14,15,16,17,18]
[19,20,21,22,23,24,25,26]
[27,28,29,30,31,32,33,34]
[35,36,37,38,39,40,41,42]
[43,44,45,46,47,48,49,50]

$ ./Main
[1,2,3,4,5,6]
[7]
[8]
[9,10,11,12,13,14,15,16,17,18,19,20]
[21,22,23,24,25,26,27,28,29,30,31,32,33]
[34,35,36,37,38,39,40,41,42,43,44]
[45,46,47,48,49,50,51,52,53,54,55]
[56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72]
[73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88]
[89,90,91,92,93,94,95,96,97,98,99,100,101,102,103]

$ ./Main
[1,2,3,4,5,6,7]
[8]
[9]
[10,11,12,13,14,15,16,17,18]
[19,20,21,22,23,24,25,26,27]
[28,29,30,31,32,33,34,35,36,37]
[38,39,40,41,42,43,44,45,46]
[47,48,49,50]
[51,52,53,54,55,56,57]
[58,59,60,61,62,63,64,65,66]

You might want to look at the source code of pipes-rt it shows one approach to deal with time in pipes.

edit: Thanks to Daniel Díaz Carrete, adapted pipes-parse-1.0 technique to handle upstream termination. A pipes-group solution should be possible using the same technique as well.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top