how to add a new source inside a conduit haskell
Question
I have a problem with the following code using network-conduit
:
import Data.Conduit.List as CL
import Data.Conduit.Text as CT
import qualified Data.ByteString.Char8 as S8
import qualified Data.Text as TT
mySource :: ResourceT m => Integer -> Source m Int
mySource i = {- function -} undefined
myApp :: Application
myApp src snk =
src $= CT.decode CT.ascii
$= CL.map decimal
$= CL.map {-problem here-}
$$ src
in problem place I want to write something like
\t -> case t of
Left err = S8.pack $ "Error:" ++ e
Right (i,xs) = (>>>=) mySource
{- or better:
do
(>>>=) mySource
(<<<=) T.pack xs
-}
where the (>>>=)
function pushes mySource
output to the next level and
(<<<=)
is sending function back to previous level
Solution
The network chops up the byte stream into arbitrary ByteString
chunks. With the code above, those ByteString
chunks will be mapped to chunks of Text
, and each chunk of Text
will be parsed as a decimal
. However, a string of decimal digits representing a single decimal
may be split across two (or more) Text
chunks. Also, as you realize, using decimal
gives you back the remainder of the Text
chunk that didn't get parsed as part of the decimal
, which you are trying to shove back into the input stream.
Both of these problems can be solved by using Data.Conduit.Attoparsec. conduitParserEither
with Data.Attoparsec.Text.decimal
. Note that it is not sufficient to just parse decimal
; you will also need to handle some kind of separator between decimal
s.
It is also not possible to splice a Source
from CL.map
, since CL.map
's type signature is
map :: Monad m => (a -> b) -> Conduit a m b
The function you pass to map
gets an opportunity to transform each input a
into a single output b
, not a stream of b
's. To do that, you can use awaitForever
, but you'll need to transform your Source
into a general Producer
with toProducer
in order for the types to match.
However, in your code, you are trying to send parse errors downstream as ByteString
's, but the output of mySource
as Int
's, which is a type error. You must provide a stream of ByteString
in both cases; the successful parse case can return a Conduit
made by fusing other Conduit
's as long as it ends up with an output of ByteString
:
...
$= (let f (Left err) = yield $ S8.pack $ "Error: " ++ show err
f (Right (_, i)) = toProducer (mySource i) $= someOtherConduit
in awaitForever f)
where someOtherConduit
sinks the Int
's from mySource
, and sources ByteString
's.
someOtherConduit :: Monad m => Conduit Int m ByteString
Finally, I believe you meant to connect the snk
at the end of the pipe instead of the src
.