Question

I have been experimenting with the new pipes-http package and I had a thought. I have two parsers for a web page, one that returns line items and another a number from elsewhere in the page. When I grab the page, it'd be nice to string these parsers together and get their results at the same time from the same bytestring producer, rather than fetching the page twice or fetching all the html into memory and parsing it twice.

In other words, say you have two Consumers:

c1 :: Consumer a m r1
c2 :: Consumer a m r2

Is it possible to make a function like this:

combineConsumers :: Consumer a m r1 -> Consumer a m r2 -> Consumer a m (r1, r2)
combineConsumers = undefined

I have tried a few things, but I can't figure it out. I understand if it isn't possible, but it would be convenient.

Edit:

I'm sorry it turns out I was making an assumption about pipes-attoparsec, due to my experience with conduit-attoparsec that caused me to ask the wrong question. Pipes-attoparsec turns an attoparsec into a pipes Parser when I just assumed that it would return a pipes Consumer. That means that I can't actually turn two attoparsec parsers into consumers that take text and return a result, then use them with the plain old pipes ecosystem. I'm sorry but I just don't understand pipes-parse.

Even though it doesn't help me, Arthur's answer is pretty much what I envisioned when I asked the question, and I'll probably end up using his solution in the future. In the meantime I'm just going to use conduit.

Was it helpful?

Solution 2

I think something is wrong with the way you are going about this, for the reasons Davorak mentions in his remark. But if you really need such a function, you can define it.

import Pipes.Internal
import Pipes.Core

zipConsumers :: Monad m => Consumer a m r -> Consumer a m s -> Consumer a m (r,s)
zipConsumers p q = go (p,q) where
  go (p,q) = case (p,q) of 
     (Pure r     , Pure s)      -> Pure (r,s)
     (M mpr      , ps)          -> M (do pr <- mpr
                                         return (go (pr, ps)))
     (pr         , M mps)       -> M (do ps <- mps
                                         return (go (pr, ps)))
     (Request _ f, Request _ g) -> Request () (\a -> go (f a, g a))
     (Request _ f, Pure s)      -> Request () (\a -> do r <- f a
                                                        return (r, s))
     (Pure r     , Request _ g) -> Request () (\a -> do s <- g a
                                                        return (r,s))
     (Respond x _, _          ) -> closed x
     (_          , Respond y _) -> closed y

If you are 'zipping' consumers without using their return value, only their 'effects' you can just use tee consumer1 >-> consumer2

OTHER TIPS

It the results are "monoidal", you can use the tee function from the Pipes prelude, in combination with a WriterT.

{-# LANGUAGE OverloadedStrings #-}

import Data.Monoid
import Control.Monad
import Control.Monad.Writer
import Control.Monad.Writer.Class
import Pipes
import qualified Pipes.Prelude as P
import qualified Data.Text as T

textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"

counter :: Monoid w => T.Text 
                    -> (T.Text -> w) 
                    -> Consumer T.Text (WriterT w IO) ()
counter word inject = P.filter (==word) >-> P.mapM (tell . inject) >-> P.drain

main :: IO ()
main = do
    result <-runWriterT $ runEffect $ 
        hoist lift textSource >-> 
        P.tee (counter "foo" inject1) >-> (counter "bar" inject2)
    putStrLn . show $ result
    where
    inject1 _ = (,) (Sum 1) mempty
    inject2 _ = (,) mempty (Sum 1)

Update: As mentioned in a comment, the real problem I see is that in pipes parsers aren't Consumers. And how can you run two parsers concurrently if they have different behaviours regarding leftovers? What happens if one of the parsers wants to "un-draw" some text and the other parser doesn't?

One possible solution is to run the parsers in a truly concurrent manner, in different threads. The primitives in the pipes-concurrency package let you "duplicate" a Producer by writing the same data to two different mailboxes. And then each parser can do whatever it wants with its own copy of the producer. Here's an example which also uses the pipes-parse, pipes-attoparsec and async packages:

{-# LANGUAGE OverloadedStrings #-}

import Data.Monoid
import qualified Data.Text as T
import Data.Attoparsec.Text hiding (takeWhile)
import Data.Attoparsec.Combinator
import Control.Applicative
import Control.Monad
import Control.Monad.State.Strict
import Pipes
import qualified Pipes.Prelude as P
import qualified Pipes.Attoparsec as P
import qualified Pipes.Concurrent as P
import qualified Control.Concurrent.Async as A

parseChars :: Char -> Parser [Char] 
parseChars c = fmap mconcat $ 
    many (notChar c) *> many1 (some (char c) <* many (notChar c))

textSource :: Producer T.Text IO ()
textSource = yield "foo" >> yield "bar" >> yield "foo" >> yield "nah"

parseConc :: Producer T.Text IO () 
          -> Parser a 
          -> Parser b 
          -> IO (Either P.ParsingError a,Either P.ParsingError b)
parseConc producer parser1 parser2 = do
    (outbox1,inbox1,seal1) <- P.spawn' P.Unbounded
    (outbox2,inbox2,seal2) <- P.spawn' P.Unbounded
    feeding <- A.async $ runEffect $ producer >-> P.tee (P.toOutput outbox1) 
                                              >->        P.toOutput outbox2
    sealing <- A.async $ A.wait feeding >> P.atomically seal1 >> P.atomically seal2
    r <- A.runConcurrently $ 
        (,) <$> (A.Concurrently $ parseInbox parser1 inbox1)
            <*> (A.Concurrently $ parseInbox parser2 inbox2)
    A.wait sealing
    return r 
    where
    parseInbox parser inbox = evalStateT (P.parse parser) (P.fromInput inbox)

main :: IO ()
main = do
    (Right a, Right b) <- parseConc textSource (parseChars 'o')  (parseChars 'a')
    putStrLn . show $ (a,b) 

The result is:

("oooo","aa")

I'm not sure how much overhead this approach introduces.

The idiomatic solution is to rewrite your Consumers as a Fold or FoldM from the foldl library and then combine them using Applicative style. You can then convert this combined fold to one that works on pipes.

Let's assume that you either have two Folds:

fold1 :: Fold a r1
fold2 :: Fold a r2

... or two FoldMs:

foldM1 :: Monad m => FoldM a m r1
foldM2 :: Monad m => FoldM a m r2

Then you combine these into a single Fold/FoldM using Applicative style:

import Control.Applicative

foldBoth :: Fold a (r1, r2)
foldBoth = (,) <$> fold1 <*> fold2

foldBothM :: Monad m => FoldM a m (r1, r2)
foldBothM = (,) <$> foldM1 <*> foldM2

-- or: foldBoth  = liftA2 (,) fold1  fold2
--     foldMBoth = liftA2 (,) foldM1 foldM2

You can turn either fold into a Pipes.Prelude-style fold or a Parser. Here are the necessary conversion functions:

import Control.Foldl (purely, impurely)
import qualified Pipes.Prelude as Pipes
import qualified Pipes.Parse   as Parse

purely Pipes.fold
    :: Monad m => Fold a b -> Producer a m () -> m b

impurely Pipes.foldM
    :: Monad m => FoldM m a b -> Producer a m () -> m b

purely Parse.foldAll
    :: Monad m => Fold a b -> Parser a m r

impurely Parse.foldMAll
    :: Monad m => FoldM a m b -> Parser a m r

The reason for the purely and impurely functions is so that foldl and pipes can interoperate without either one incurring a dependency on the other. Also, they allow libraries other than pipes (like conduit) to reuse foldl without a dependency, too (Hint hint, @MichaelSnoyman).

I apologize that this feature is not documented, mainly because it took me a while to figure out how to get pipes and foldl to interoperate in a dependency-free manner, and that was after I wrote the pipes tutorial. I will update the tutorial to point out this trick.

To learn how to use foldl, just read the documentation in the main module. It's a very small and easy-to-learn library.

For what it's worth, in the conduit world, the relevant function is zipSinks. There might be some way to adapt this function to work for pipes, but automatic termination may get in the way.

Consumer forms a Monad so

combineConsumers = liftM2 (,)

will type check. Unfortunately, the semantics might be unlike what you're expecting: the first consumer will run to completion and then the second.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top