Say I have simple producer/consumer model where the consumer wants to pass back some state to the producer. For instance, let the downstream-flowing objects be objects we want to write to a file and the upstream objects be some token representing where the object was written in the file (e.g. an offset).

These two processes might look something like this (with pipes-4.0),

{-# LANGUAGE GeneralizedNewtypeDeriving #-}

import Pipes
import Pipes.Core
import Control.Monad.Trans.State       
import Control.Monad

newtype Object = Obj Int
               deriving (Show)

newtype ObjectId = ObjId Int
                 deriving (Show, Num)

writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
  where go = do i <- get
                obj <- lift $ request i
                lift $ lift $ putStrLn $ "Wrote "++show obj
                modify (+1)

produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
  where go [] = return ()
        go (obj:rest) = do
            lift $ putStrLn $ "Producing "++show obj
            objId <- respond obj
            lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
            go rest

objects = [ Obj i | i <- [0..10] ]

Simple as this might be, I've had a fair bit of difficulty reasoning about how to compose them. Ideally, we'd want a push-based flow of control like the following,

  1. writeObjects starts by blocking on request, having sent the initial ObjId 0 upstream.
  2. produceObjects sends the first object, Obj 0, downstream
  3. writeObjects writes the object and increments its state, and waits on request, this time sending ObjId 1 upstream
  4. respond in produceObjects returns with ObjId 0
  5. produceObjects continues at Step (2) with the second object, Obj 1

My initial attempt was with push-based composition as follows,

main = void $ run $ produceObjects objects >>~ const writeObjects

Note the use of const to work around the otherwise incompatible types (this is likely where the problem lies). In this case, however, we find that ObjId 0 gets eaten,

Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...

A pull-based approach,

main = void $ run $ const (produceObjects objects) +>> writeObjects

suffers a similar issue, this time dropping Obj 0.

How might one go about composing these pieces in the desired manner?

有帮助吗?

解决方案

The choice of which composition to use depends on which component should initiate the entire process. If you want the downstream pipe to initiate the process then you want to use pull-based composition (i.e. (>+>)/(+>>)) but if you want the upstream pipe to initiate the process then you should use push-based composition (i.e. (>>~)/(>~>)). The type errors you got were actually warning you that there is a logical error in your code: you haven't clearly established which component initiates the process first.

From your description, it's obvious that you want control flow to begin from produceObjects so you want to use push-based composition. Once you use push-based composition, the type of the composition operator will tell you everything you need to know about how to fix your code. I'll take its type and specialize it to your composition chain:

-- Here I'm using the `Server` and `Client` type synonyms to simplify the types
(>>~) :: Server ObjectId Object IO ()
      -> (Object -> Client ObjectId Object IO ())
      -> Effect IO ()

As you already noticed, the type error you got when you tried to use (>>~) told you that you were missing an argument of type Object to your writeObjects function. This statically enforces that you cannot run any code in writeObjects before receiving your first Object (through the initial argument).

The solution is to rewrite your writeObjects function like this:

writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj0 = evalStateT (go obj0) (ObjId 0)
  where go obj = do i <- get
                    lift $ lift $ putStrLn $ "Wrote "++ show obj
                    modify (+1)
                    obj' <- lift $ request i
                    go obj'

This then gives the correct behavior:

>>> run $ produceObjects objects >>~ writeObjects
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 0
Producing Obj 1
Wrote Obj 1
Object Obj 1 has ID ObjId 1
Producing Obj 2
Wrote Obj 2
Object Obj 2 has ID ObjId 2
Producing Obj 3
Wrote Obj 3
Object Obj 3 has ID ObjId 3
Producing Obj 4
Wrote Obj 4
Object Obj 4 has ID ObjId 4
Producing Obj 5
Wrote Obj 5
Object Obj 5 has ID ObjId 5
Producing Obj 6
Wrote Obj 6
Object Obj 6 has ID ObjId 6
Producing Obj 7
Wrote Obj 7
Object Obj 7 has ID ObjId 7
Producing Obj 8
Wrote Obj 8
Object Obj 8 has ID ObjId 8
Producing Obj 9
Wrote Obj 9
Object Obj 9 has ID ObjId 9
Producing Obj 10
Wrote Obj 10
Object Obj 10 has ID ObjId 10

You might wonder why this requirement that one of the two pipes takes an initial argument makes sense, other than the abstract justification that this is what the category laws require. The plain English explanation is that the alternative is that you would need buffer the first transmitted Object "in between" the two pipes before writeObjects reached its first request statement. This approach produces a lot of problematic behavior and buggy corner cases, but probably the most significant problem is that pipe composition would no longer be associative and the order of effects would change based on the order in which you composed things.

The nice thing about the bidirectional pipe composition operators is that the types work out so that you can always deduce whether or not a component is "active" (i.e. initiates control) or "passive" (i.e. waits for input) purely by studying the type. If composition says that a certain pipe (like writeObjects) must take an argument, then it's passive. If it takes no argument (like produceObjects), then it's active and initiates control. So composition forces you to have at most one active pipe within your pipeline (the pipe that doesn't take an initial argument) and that's the pipe that begins control.

其他提示

The 'const's are where you are dropping the data. In order to get all the data, you probably want to do a push-based workflow as follows:

writeObjects :: Object -> Proxy ObjectId Object () X IO r
writeObjects obj = go 0 obj
  where
    go objid obj = do
        lift $ putStrLn $ "Wrote "++show obj
        obj' <- request objid
        go (objid + 1) obj'

-- produceObjects as before

main = void $ run $ produceObjects objects >>~ writeObjects

We've been discussing this on the mailing list, but I figured I'd throw it up here as well for those who are interested.

Your problem is that you have two coroutines which are both ready to spit out values at each other. Neither one needs the input of the other in order to produce a value. So who gets to go first? Well you said it yourself:

writeObjects starts by blocking on request, having sent the initial ObjId 0 upstream

Okay then, that means we need to delay produceObjects so that it waits for an ObjId signal before spitting out the corresponding object (even though it apparently doesn't need said ID).

Dipping into Proxy internals, here is the magic incantation which I will not bother to explain very carefully at this time. The basic idea is just to take input before you need it, then apply the input when needed, but then pretend like you need a new input (even though you don't need that one just yet):

delayD :: (Monad m) => Proxy a' a b' b m r -> b' -> Proxy a' a b' b m r
delayD p0 b' = case p0 of
    Request a' f -> Request a' (go . f)
    Respond b  g -> Respond b  (delayD (g b'))
    M m          -> M (liftM go m)
    Pure r       -> Pure r
  where
    go p = delayD p b'

Now, you can use this on produceObjects objects instead of const, and your second attempt works as desired:

delayD (produceObjects objects) +>> writeObjects

We're discussing delayD on the mailing list to see if it merits inclusion in the standard Pipes repertoire.

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top