Question

I want to do something along the lines of ArrowChoice, but with conduits. I want to await an Either value and then pass Left values to one conduit and Right values to another, and then merge the results back into an Either stream.

Presumably this can be done by making the inner conduits like automata: turn a conduit into a function that takes an argument and returns a monadic list of outputs yielded:

newtype AutomataM i m o = Automata (i -> m (o, Automata i o))

conduitStep :: Conduit i m o -> AutomataM i m [o]

The reason for the list of outputs is that a Conduit may yield 0 or more outputs for each input.

I've looked at ResumableConduit and its relatives, and presumably the answer is in there somewhere. But I can't quite see how its done.

Was it helpful?

Solution

It's not exactly the same type signature you provided, but:

import Data.Conduit
import Data.Conduit.Internal (Pipe (..), ConduitM (..))

newtype Automata i o m r = Automata (m ([o], Either r (i -> Automata i o m r)))

conduitStep :: Monad m => ConduitM i o m r -> Automata i o m r
conduitStep (ConduitM con0) =
    Automata $ go [] id con0
  where
    go _ front (Done r) = return (front [], Left r)
    go ls front (HaveOutput p _ o) = go ls (front . (o:)) p
    go ls front (NeedInput p _) =
        case ls of
            [] -> return (front [], Right $ conduitStep . ConduitM . p)
            l:ls' -> go ls' front (p l)
    go ls front (PipeM mp) = mp >>= go ls front
    go ls front (Leftover p l) = go (l:ls) front p

But just be careful with this approach:

  1. By keeping the output as a list, it's not constant memory.
  2. We're throwing away finalizers.

There's probably a way to provide a ZipConduit abstraction, similar to ZipSource and ZipSink, that would handle this kind of problem more elegantly, but I haven't thought about it too much.


EDIT I ended up implementing ZipConduit in conduit-extra 0.1.5. Here's a demonstration of using it which sounds a bit like your case:

import           Control.Applicative
import           Data.Conduit
import           Data.Conduit.Extra
import qualified Data.Conduit.List   as CL

conduit1 :: Monad m => Conduit Int m String
conduit1 = CL.map $ \i -> "conduit1: " ++ show i

conduit2 :: Monad m => Conduit Double m String
conduit2 = CL.map $ \d -> "conduit2: " ++ show d

conduit :: Monad m => Conduit (Either Int Double) m String
conduit = getZipConduit $
    ZipConduit (lefts =$= conduit1) *>
    ZipConduit (rights =$= conduit2)
  where
    lefts = CL.mapMaybe (either Just (const Nothing))
    rights = CL.mapMaybe (either (const Nothing) Just)

main :: IO ()
main = do
    let src = do
            yield $ Left 1
            yield $ Right 2
            yield $ Left 3
            yield $ Right 4
        sink = CL.mapM_ putStrLn
    src $$ conduit =$ sink

OTHER TIPS

There's a folk method of doing this using pipes by using "push-category" Pipes. The complete implementation comes from both this mailing list post and this Stack Overflow answer. I think it hasn't been released yet due to both an effort to simplify the Pipes interface, a focus on using the "sequencing" monad instance which is hidden via this method, and no proof yet that this implementation truly implements the Arrow class properly.

The idea is to implement a newtype Edge (demonstrated below) which is a push-based pipe with the type arguments in the right order for Category, Arrow, ArrowChoice and both Functor and Applicative over their output values. This lets you compose them into directed acyclic graphs using arrow notation. I'll run over the implementation below, but it's safe to just ignore it and use the Arrow/ArrowChoice/Applicative instances of Edge without too much concern.

(Edit: This code is best made available at https://github.com/Gabriel439/Haskell-RCPL-Library)


{-# LANGUAGE RankNTypes           #-}
{-# LANGUAGE TypeSynonymInstances #-}

import Prelude hiding ((.), id)
import Pipes.Core
import Pipes.Lift
import Control.Monad.Morph
import Control.Category
import Control.Monad.State.Strict
import Control.Arrow

This is an atypical mode of using pipes and isn't exposed in the Pipes module; you must import Pipes.Core to use push. Push-based pipes look like

-- push :: a -> Proxy a' a a' a m r

and thus they demand at least one upstream value before the Proxy is allowed to run. This means the whole process needs to be "kickstarted" by passing the first value as a function call and that the leftmost push-Proxy will control the entire stream.

Given a push-based pipe we can implement Category, Arrow and ArrowChoice. The standard solution also involves the Edge typeclass so that we have the type arguments in the right order for Category and Arrow

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

For the Category instance, we use the "push" Category which has push as id and (<~<) as composition:

instance Monad m => Category (Edge m r) where
  id = Edge push
  Edge a . Edge b = Edge (a <~< b)

We embed functions into Edge with arr by augmenting id (i.e. push) on the downward edge. To do this we use the respond category which has the law p />/ respond == p, but jam our f into the process.

instance Monad m => Arrow (Edge m r) where
  arr f = Edge (push />/ respond . f)

We also use a local state transformer to store the snd half of our pairs and pass it "around" the input pipe in first

  first (Edge p) = Edge $ \(b, d) ->
    evalStateP d $ (up \>\ hoist lift . p />/ dn) b
    where
      up () = do
        (b, d) <- request ()
        lift (put d)
        return b
      dn c = do
        d <- lift get
        respond (c, d)

Finally, we get an ArrowChoice instance by implementing left. To do so we split the burden of passing the Left and Right sides using either the return or the pipe to pass values.

instance (Monad m) => ArrowChoice (Edge m r) where
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
      where
          bef x = case x of
              Left b -> return b
              Right d -> do
                  _ <- respond (Right d)
                  x2 <- request ()
                  bef x2
          up () = do
              x <- request ()
              bef x
          dn c = respond (Left c)

We can use Edge to create "push-based" producers and consumers

type PProducer m r b =            Edge m r () b
type PConsumer m r a = forall b . Edge m r a  b

and then we'll provide Functor and Applicative instances for PProducer. This goes by case analysis on the underlying Pipe, so it's a bit verbose. Essentially, however, all that happens is that we insert f into the yield slot of the Pipe.

instance Functor (PProducer m r) where
  fmap f (Edge k) = $ Edge $ \() -> go (k ()) where
    go p = case p of
      Request () ku -> Request ()    (\() -> go (ku ()))
      -- This is the only interesting line
      Respond b  ku -> Respond (f b) (\() -> go (ku ()))
      M          m  -> M (m >>= \p' -> return (go p'))  
      Pure    r     -> Pure r

Finally, Applicative is much the same except that we have to switch between running the upstream pipe to produce functions and running the downstream pipe to produce arguments.

instance (Monad m) => Applicative (Edge m r ()) where
    pure b = Edge $ \() -> forever $ respond b
    (Edge k1) <*> (Edge k2) = Edge (\() -> goL (k1 ()) (k2 ()))
      where
        goL p1 p2 = case p1 of
            Request () ku -> Request () (\() -> goL   (ku ()) p2)
            Respond f  ku ->                    goR f (ku ()) p2
            M          m  -> M (m >>= \p1' -> return (goL p1' p2))
            Pure    r     -> Pure r
        goR f p1 p2 = case p2 of
            Request () ku -> Request ()    (\() -> goR f p1 (ku ()))
            Respond x  ku -> Respond (f x) (\() -> goL   p1 (ku ()))
            M          m  -> M (m >>= \p2' -> return (goR f p1 p2'))
            Pure    r     -> Pure r
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top