Question

I am having a look at the pipes 3.0 package for stream processing. The tutorial is very well done and very clear, except that I cannot wrap my head around the "zip and merge" section.

My goal is to combine pipes a bit like ArrowChoice allows to do:

  • I have a unique producer of Either a a
  • I would like to apply a first pipe to Left values and another one to Right values
  • I would then like to merge the results, and continue piping


+----------+                   +------+ - filterLeft ->  pipe1 -> +------------+ 
| producer | - (Either a a) -> | fork |                           | mergeD (?) |
+----------+                   +------+ - filterRight -> pipe2 -> +------------+

I define fork like in the tutorial:

fork () = 
    runIdentityP . hoist (runIdentityP . hoist runIdentityP) $ forever $ do
        a <- request ()
        lift $ respond a
        lift $ lift $ respond a

oddOrEven x = if odd x then Left x else Right x
producer = fromListS [1..0] >-> mapD oddOrEven
isLeft (Left _) = True
isLeft (Right _) = False
isRight = not . isLeft
filterLeft = filterD isLeft
filterRight = filterD isRight
pipe1 = mapD (\x -> ("seen on left", x))
pipe2 = mapD (\x -> ("seen on right", x))

p1 = producer >-> fork    

The problem is that I cannot make the types right. The tutorial seems only to show how to run the inner (lifted) pipe chain as a self contained session, but I would like to be able to reinject its values to the pipe, not just apply an effect on them. I of course tried to follow the types, but they get a bit hairy very quickly.

Does Anyone can help me on this ? Thanks in advance.

(PS: an example of this kind of topology would be a nice addition to the tutorial, or even better a section on how to emulate the Control.Arrow stuff using pipes)

Was it helpful?

Solution

The pipe abstraction does not support diamond topologies or any form of Arrow-like behavior. This is not an API issue, but rather there is no correct or well-defined behavior for such a scenario.

To explain why, allow me to simplify your diagram to the following one:

          +----+
          | pL |
+----+ => +----+ => +----+
| p1 |              | p2 |
+----+ => +----+ => +----+
          | pR |
          +----+

Imagine we are at the p1 pipe and we respond to pL. If you remember the tutorial, the proxy laws require that every respond blocks until upstream. That means that p1 cannot regain control until pL requests again. So at this point we have:

  • p1 blocked waiting for a request from pL

However, suppose that pL does not request yet and instead responds with a value of its own to p2. So now we have:

  • p1 blocked waiting for a request from pL
  • pL blocked waiting for a request from p2

Now suppose that p2 instead requests from pR. The proxy laws say that p2 cannot regain control until pR responds again. Now we have:

  • p1 blocked waiting for a request from pL
  • pL blocked waiting for a request from p2
  • p2 blocked waiting for a respond from pR

Now what happens when pR requests a value from p1? If we consult our list of blocks, p1 is still blocked waiting for a request from pL, so it is in no shape to receive a request from pR. There is no correct way to "tie the knot", so to speak, even if pL and pR shared the same request signature.

More generally, the proxy laws ensure the following two invariants:

  • Every pipe "upstream" of the active pipe will be blocked on a respond
  • Every pipe "downstream" of the acive pipe will be blocked on a request

Cycles or diamonds break these invariants. This is why the tutorial very briefly remarks in passing that cyclic topologies do not "make sense".

You can see why diamonds break this invariant in the example I just gave you. When p1 had control it was upstream of pR, which would imply pR was blocked on a request. However, when p2 gained control it was downstream of pR, which would imply pR was blocked on a respond. This leads to a contradiction, because pR couldn't have changed yet since control flowed through pL and not pR to get to p2.

Machines

So there are two solutions to your problem. one solution is to just inline your desired splitting behavior into a single pipe. You define a pE pipe that combines the behavior of pL and pR into a single pipe.

The more elegant solution to this problem is something in the style of Edward's machines. You define a more restricted abstraction that is less powerful than proxies that supports ArrowChoice, you do your arrow-ish stuff within the domain of that abstraction, and then when you are done you upgrade it to proxies.

If you squint, you could pretend that there is a category of currently available coroutine abstractions in Haskell that is a partial order. Coroutines abstractions are the objects, and an arrow from coroutine abstraction C1 to coroutine abstraction C2 means that you can embed coroutines of type C1 in coroutines of type C2 (i.e. C1 is an improper subset of C2).

In this partial order, proxies would probably be the terminal object, meaning that you can think of proxies as the assembly language of coroutines. Following the analogy of assembly language, proxies provide less guarantees, but you can embed more restrictive coroutine abstractions (i.e. higher-level languages) within proxies. These higher-level languages provide greater restrictions which enables more powerful abstractions (i.e. an Arrow instance).

If you want a trivial example of this, consider one of the simplest coroutine abstractions: the Kleisli arrow:

newtype Kleisli m a b = Kleisli { runKleisli :: a -> m b }

instance Category (Kleisli m) where
    id = Kleisli return
    (Kleisli f) . (Kleisli g) = Kleisli (f <=< g)

Kleisli arrows are definitely more restrictive than proxies, but because of this restriction they support an Arrow instance. So whenever you need an Arrow instance you write your code using Kleisli arrows, and combine it using Arrow notation, and then when you are done, you can "compile" that higher-level Kleisli code to the proxy assembly code using mapMD:

kleisliToProxy :: (Proxy p) => Kleisli m a b -> () -> Pipe p a b m r
kleisliToProxy (Kleisli f) = mapMD f

This compilation obeys the functor laws:

kleisliToProxy id = idT

kleisliToProxy (f . g) = kleisliToProxy f <-< kleisliToProxy g

So if your branching code can be written in terms of Kleisli arrows, then use Kleisli arrows for that section of the code and then compile it down to proxies when you are done. Using this trick, you can compile multiple coroutine abstractions down to the proxy abstraction to mix them.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top