How do I make a conduit like takeWhile but only takes at maximum a certain number of bytes?

StackOverflow https://stackoverflow.com/questions/19483645

  •  01-07-2022
  •  | 
  •  

Question

I'm trying to make a conduit that's sort of a cross between takeWhile and isolate. That is, it will consume from the input and yield to the output until either the predicate no longer holds or it has reached the byte limit. I know the type signature will be

isolateWhile :: (Monad m) => Int -> (Word8 -> Bool) -> Conduit ByteString m ByteString

As an example of its use:

{-# LANGUAGE OverloadedStrings #-}
import Data.Conduit
import qualified Data.Conduit.List   as CL
import qualified Data.Conduit.Binary as CB
import Control.Monad.Trans.Class

charToWord = fromIntegral . fromEnum

example :: Int -> Char -> IO ()
example limit upTo = do
    untaken <- CB.sourceLbs "Hello, world!" $= conduit $$ CB.sinkLbs
    putStrLn $ "Left " ++ show untaken
  where
    conduit = do
      taken <- toConsumer $ isolateWhile limit (/= charToWord upTo) =$ CB.sinkLbs
      lift $ putStrLn $ "Took " ++ show taken
      CL.map id  -- pass the rest through untouched

I expect that

ghci> example 5 'l'
Took "He"
Left "llo, world!"
ghci> example 5 'w'
Took "Hello"
Left ", world!"

However, the simplest possible definition of isolateWhile:

isolateWhile limit pred = CB.isolate limit =$= CB.takeWhile pred

yields

ghci> example 5 'l'
Took "He"
Left ", world!"
ghci> example 5 'w'
Took "Hello"
Left ", world!"

In other words, isolate will eat up the entire Hello, leaving He to takeWhile and discarding the llo. This data loss is undesirable for my application. However, it is notable that the second case yields the expected result.

If I swap the operands of =$= like so:

isolateWhile limit pred = CB.takeWhile pred =$= CB.isolate limit

Then

ghci> example 5 'l'
Took "He"
Left ", world!"
ghci> example 5 'w'
Took "Hello"
Left ""

Now I've fixed the first test, but broken the second one! This time, takeWhile will take whatever it needs and isolate will take a subset of that; but whatever takeWhile uses that isolate doesn't will be discarded, and this is undesirable.

Lastly, I tried:

isolateWhile limit pred = do
  untaken <- CB.isolate limit =$= (CB.takeWhile pred >> CL.consume)
  mapM_ leftover $ reverse untaken

This actually works! Whatever isolate accepts and takeWhile doesn't is consumed by the CL.consume and placed back into the stream with leftover. Unfortunately, this seems like a horrible kludge, and undesirably (although not unusably so) it will buffer up to limit bytes in memory only to put it back with leftover. That seems like a waste.

The only solution I can think of is to write it in terms of the primitives await, yield and leftover as takeWhile and isolate are themselves written. While this would solve all the problems without wasting much, it seems like there must be a better way.

Am I missing something, or is there really no better way to write this?

Was it helpful?

Solution

There's a known limitation in the current version of conduit: fusion always discards downstream leftovers, which is exactly what you're running into here. There are some discussions right now about an architecture to resolve this, but for the moment, writing your function in terms of the primitives is likely your best option.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top