Question

I have a problem with the following code using network-conduit:

import Data.Conduit.List as CL
import Data.Conduit.Text as CT
import qualified Data.ByteString.Char8 as S8
import qualified Data.Text as TT

mySource :: ResourceT m => Integer -> Source m Int
mySource i = {- function -} undefined

myApp :: Application
myApp src snk = 
    src $= CT.decode CT.ascii
        $= CL.map decimal
        $= CL.map {-problem here-}
        $$ src

in problem place I want to write something like

\t -> case t of
    Left err = S8.pack $ "Error:" ++ e
    Right (i,xs) = (>>>=) mySource 
                 {- or better: 
                   do 
                   (>>>=) mySource
                   (<<<=) T.pack xs
                  -}

where the (>>>=) function pushes mySource output to the next level and (<<<=) is sending function back to previous level

Was it helpful?

Solution

The network chops up the byte stream into arbitrary ByteString chunks. With the code above, those ByteString chunks will be mapped to chunks of Text, and each chunk of Text will be parsed as a decimal. However, a string of decimal digits representing a single decimal may be split across two (or more) Text chunks. Also, as you realize, using decimal gives you back the remainder of the Text chunk that didn't get parsed as part of the decimal, which you are trying to shove back into the input stream.

Both of these problems can be solved by using Data.Conduit.Attoparsec. conduitParserEither with Data.Attoparsec.Text.decimal. Note that it is not sufficient to just parse decimal; you will also need to handle some kind of separator between decimals.

It is also not possible to splice a Source from CL.map, since CL.map's type signature is

map :: Monad m => (a -> b) -> Conduit a m b

The function you pass to map gets an opportunity to transform each input a into a single output b, not a stream of b's. To do that, you can use awaitForever, but you'll need to transform your Source into a general Producer with toProducer in order for the types to match.

However, in your code, you are trying to send parse errors downstream as ByteString's, but the output of mySource as Int's, which is a type error. You must provide a stream of ByteString in both cases; the successful parse case can return a Conduit made by fusing other Conduit's as long as it ends up with an output of ByteString:

...
$= (let f (Left err) = yield $ S8.pack $ "Error: " ++ show err
        f (Right (_, i)) = toProducer (mySource i) $= someOtherConduit
    in awaitForever f)

where someOtherConduit sinks the Int's from mySource, and sources ByteString's.

someOtherConduit :: Monad m => Conduit Int m ByteString

Finally, I believe you meant to connect the snk at the end of the pipe instead of the src.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top