Question

I'm trying to write a basic network server using pipes and the assorted libraries that build on it. The intended flow would be:

get bytestring from socket -> decode using binary -> server logic goes here -> send response to socket

Which I figured would be something like:

fromSocket s 4096 >-> decode >-> serverLogic >-> toSocket s

pipes-binary has a decode and a decodeMany, but I am not sure I understand the difference, and I don't know how to use decode. Why does decodeMany take the upstream pipe as an argument instead of being chained off of it with >->? And how do you use decode, what is the StateT for and what should my pipe chain end up looking like?

Was it helpful?

Solution

The StateT (Producer a m r) m x idiom comes from pipes-parse's "Low-level Parsers". It typically means that the library is using draw and unDraw to pull values off a Producer and return them if they're unused. It's an essential component of parsing where failure might occur. It also requires the StateT layer to indicate that a pipe is being selectively drained and refilled in a stateful manner.

-- | Draw one element from the underlying Producer, 
-- returning Left if the Producer is empty
draw :: Monad m => StateT (Producer a m r) m (Either r a)

-- | Push back an element onto the underlying Producer
unDraw :: Monad m => a -> StateT (Producer a m r) m ()

So what does that mean for decode and decodeMany? If we look at some simplified types of those functions

-- for (Monad m, Binary b)

decode     :: StateT (Producer ByteString m r) m (Maybe b)
decodeMany :: Producer ByteString m r 
           -> Producer' b m (Either (Producer ByteString m r) r)

We first see that decode is drawing off enough ByteString chunks from a Producer ByteString statefully so as to try to parse a b. Since the chunk boundary on the ByteStrings may not align with a parse boundary it's important to do this in StateT so that the leftover chunks can be unDraw-ed back into the Producer.

decodeMany builds atop decode and attempts to repeatedly decode bs off the input Producer returning a "continuation" Producer of leftover ByteStrings on failure.

Long story short, due to a need to unDraw leftover ByteString chunks, we can just compose these things together into a chain with (>->). If you want to do that, you can use something like decodeMany to transform a producer and then chain the result, but you'll want to handle error cases carefully.

OTHER TIPS

I want to complement J. Abrahamson's answer by answering your other question about why the decoder is not a Pipe.

The difference between a Pipe with a type like:

pipe :: Pipe a b m r

... and function between Producers like (I call these "getter"s):

getter :: Producer a m r -> Producer b m r

... is that a Pipe can be used to transform Producers, Consumers, and other Pipes:

(>-> pipe) :: Producer a m r -> Producer b m r

(>-> pipe) :: Pipe x a m r -> Pipe x b m r

(pipe >->) :: Consumer b m r -> Consumer a m r

(pipe >->) :: Pipe b y m r -> Pipe a y m r

... whereas a "getter" can only transform Producers. Some things cannot be modeled correctly using Pipes and leftovers are one of those things.

conduit purports to model leftovers using Conduits (the conduit analog of Pipes) but it gets this wrong. I've put together a simple example showing why. First, just implement a peek function for conduit:

import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Data.Conduit.List (isolate, sourceList)

peek :: Monad m => Sink a m (Maybe a)
peek = do
    ma <- await
    case ma of
        Nothing -> return ()
        Just a  -> leftover a
    return ma

This works as expected for simple cases like this:

source :: Monad m => Source m Int
source = sourceList [1, 2]

sink1 :: Show a => Sink a IO ()
sink1 = do
    ma1 <- peek
    ma2 <- peek
    lift $ print (ma1, ma2)

This will return the first element of the source twice:

>>> source $$ sink1
(Just 1,Just 1)

... but if you compose a Conduit upstream of a Sink, any leftovers that the sink pushes back are irreversibly lost:

sink2 :: Show a => Sink a IO ()
sink2 = do
    ma1 <- isolate 10 =$ peek
    ma2 <- peek
    lift $ print (ma1, ma2)

Now the second peek incorrectly returns 2:

>>> source $$ sink2
(Just 1,Just 2)

Also, note that pipes-parse just got a new major version released today, which simplifies the API and adds an extensive tutorial that you can read here.

This new API correctly propagates leftovers further upstream. Here is the analogous example for pipes:

import Lens.Family.State.Strict (zoom)
import Pipes
import Pipes.Parse
import Prelude hiding (splitAt)

parser :: Show a => Parser a IO ()
parser = do
    ma1 <- zoom (splitAt 10) peek
    ma2 <- peek
    lift $ print (ma1, ma2)

producer :: Monad m => Producer Int m ()
producer = each [1, 2]

Even though the first peek is also limited to the first 10 values, it correctly undraws the first value and makes it available to the second peek:

>>> evalStateT parser producer
(Just 1,Just 1)

Conceptually, the reason why pipes-parse "thinks in terms of Producers" is because otherwise the concept of leftovers is not clearly defined. If you don't clearly define what your source is, you can't clearly articulate where leftovers values should go. This is why Pipes and Consumers do not lend themselves well to tasks that require leftovers.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top