There's a known limitation in the current version of conduit: fusion always discards downstream leftovers, which is exactly what you're running into here. There are some discussions right now about an architecture to resolve this, but for the moment, writing your function in terms of the primitives is likely your best option.
How do I make a conduit like takeWhile but only takes at maximum a certain number of bytes?
Question
I'm trying to make a conduit that's sort of a cross between takeWhile
and isolate
. That is, it will consume from the input and yield to the output until either the predicate no longer holds or it has reached the byte limit. I know the type signature will be
isolateWhile :: (Monad m) => Int -> (Word8 -> Bool) -> Conduit ByteString m ByteString
As an example of its use:
{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import Control.Monad.Trans.Class
charToWord = fromIntegral . fromEnum
example :: Int -> Char -> IO ()
example limit upTo = do
untaken <- CB.sourceLbs "Hello, world!" $= conduit $$ CB.sinkLbs
putStrLn $ "Left " ++ show untaken
where
conduit = do
taken <- toConsumer $ isolateWhile limit (/= charToWord upTo) =$ CB.sinkLbs
lift $ putStrLn $ "Took " ++ show taken
CL.map id -- pass the rest through untouched
I expect that
ghci> example 5 'l'
Took "He"
Left "llo, world!"
ghci> example 5 'w'
Took "Hello"
Left ", world!"
However, the simplest possible definition of isolateWhile
:
isolateWhile limit pred = CB.isolate limit =$= CB.takeWhile pred
yields
ghci> example 5 'l'
Took "He"
Left ", world!"
ghci> example 5 'w'
Took "Hello"
Left ", world!"
In other words, isolate
will eat up the entire Hello
, leaving He
to takeWhile
and discarding the llo
. This data loss is undesirable for my application. However, it is notable that the second case yields the expected result.
If I swap the operands of =$=
like so:
isolateWhile limit pred = CB.takeWhile pred =$= CB.isolate limit
Then
ghci> example 5 'l'
Took "He"
Left ", world!"
ghci> example 5 'w'
Took "Hello"
Left ""
Now I've fixed the first test, but broken the second one! This time, takeWhile
will take whatever it needs and isolate
will take a subset of that; but whatever takeWhile
uses that isolate
doesn't will be discarded, and this is undesirable.
Lastly, I tried:
isolateWhile limit pred = do
untaken <- CB.isolate limit =$= (CB.takeWhile pred >> CL.consume)
mapM_ leftover $ reverse untaken
This actually works! Whatever isolate
accepts and takeWhile
doesn't is consumed by the CL.consume
and placed back into the stream with leftover
. Unfortunately, this seems like a horrible kludge, and undesirably (although not unusably so) it will buffer up to limit
bytes in memory only to put it back with leftover
. That seems like a waste.
The only solution I can think of is to write it in terms of the primitives await
, yield
and leftover
as takeWhile
and isolate
are themselves written. While this would solve all the problems without wasting much, it seems like there must be a better way.
Am I missing something, or is there really no better way to write this?
Solution