Question

Using the pipes library, I want to write a program to read data from some source and accumulate it monoidally (say, with Sum). The easiest way to do this would be,

 import Control.Proxy as 
 import Data.Monoid (Sum)

 main = do
     let source = enumFromToS (0::Int) 5
     a <- runWriterT $ runProxy $ source >-> foldD Sum
     print a

Of course, while this works for small sources, large inputs will result in the dreaded stack overflow due to the lazy nature of WriterT's accumulator.

Thankfully, it seems that pipes anticipates this, providing a WriterP proxy with a strict accumulator. Unfortunately, the documentation surrounding this proxy is pretty sparse. After a bit of poking around (and simplifying the problem to instead accumulate a 1 for each downstream element), I came to this program,

import Control.Proxy
import Control.Proxy.Trans.Writer
import Data.Monoid (Sum)

main = do
    let source = enumFromToS (0::Int) 5
    a <- runProxy $ runWriterK $ source >-> \x->tell (Sum 1::Sum Int)
    print a

Of course, this program doesn't even perform the simplified task correctly as it accumulates to 1 instead of 6. If I'm not mistaken, this behavior is explained by the fact that the pipe only reads one element before terminating. To repeat until the end of the input, I came up with the following,

import Control.Proxy
import Control.Proxy.Trans.Writer
import Data.Monoid (Sum)

main = do
    let source = enumFromToS (0::Int) 5
    a <- runProxy $ runWriterK $ source >-> fold Sum
    print a

fold :: (Monad m, Proxy p, Monoid w) => (a -> w) -> a' -> WriterP w p a' a a' a m r
fold f = go
  where go x = do a <- request x
                  tell $ f a
                  x' <- respond a
                  go x'

This code, however, returns an accumulator of 0. Why is this? Is there a function like my fold provided in pipes?

Given that many use-cases for pipes are long-running processes working with large data-sets, would it not make sense for the folds in Control.Proxy.Prelude to be built around the strict WriterP instead of WriterT? Currently it feels like the proxy transformers in pipes are second-class citizens, present but lacking many of the combinators that make WriterT so handy.

Was it helpful?

Solution

I'm adding a new answer because I've fixed this issue in pipes-3.3, which I just uploaded to Hackage. The theory behind pipes shows that the global behavior you were expecting was the right behavior all along, and WriterP now behaves globally so you can fold within a pipe.

I've modified your example to show you would implement it using pipes-3.3:

import Control.Proxy
import Control.Proxy.Trans.Writer

main = do
    let source = enumFromToS (0::Int) 5
    a <- runProxy $ execWriterK $ source >-> sumD
    print a

You can also now retrieve the results of a fold within a pipeline. For example, this is perfectly valid:

chunksOf :: (Monad m, Proxy p) => Int -> () -> Pipe p a [a] m r
chunksOf n () = runIdentityP $ forever $ do
    -- The unitU discards the values that 'toListD' reforwards
    as <- execWriterK (takeB_ n >-> toListD >-> unitU) ()
    respond as

Here's an example usage:

>>> runProxy $ getLineS >-> takeWhileD (/= "quit") >-> chunksOf 3 >-> printD
1<Enter>
2<Enter>
3<Enter>
["1","2","3"]
4<Enter>
5<Enter>
6<Enter>
["4","5","6"]
quit

Sorry for getting the answer wrong the first time!

OTHER TIPS

Remember that proxy transformers behave locally, whereas the base monad behaves globally. That means that WriterP has each proxy maintain its own accumulator, and the proxy that terminates first determines which accumulator returns. The reason the accumulator returned 0 is because your enumerating pipe returned first and it was not accumulating anything.

The WriterP fold is strict only because I have control over that type (and not the one in transformers). I never intended it to be the strict alternative to the lazy folds in the proxy prelude. The correct strict alternative to use is foldlD'.

Note that foldlD' mappend is basically a strict Writer fold, particularly if you run the base State monad using mempty as the initial state.

In your case, you can do it even more easily using:

main = do
    let source = enumFromToS (0::Int) 5
    a <- (`runStateT` 0) $ runProxy $ source >-> foldlD' (+)
    print a

That will fold the input strictly.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top