Question

The goal is to have a conduit with the following type signature

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a

The conduit should repeatedly parse protocol buffers (using the ByteString -> a function) received via TCP/IP (using the network-conduit package).

The wire message format is

{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...

(The curly braces are not party of the protocol, only used here to separate the entities).

The first idea was to use sequenceSink to repeatedly apply a Sink that is able to parse one ProtoBuf:

[...]
import qualified Data.Binary         as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util   as CU

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
    CU.sequenceSink () $ \() ->
        do lenBytes <- CB.take 4                                -- read protobuf length
           let len :: Word32
               len = B.decode lengthBytes                       -- decode ProtoBuf length
               intLen = fromIntegral len
           protobufBytes <- CB.take intLen                      -- read the ProtoBuf bytes
           return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf

It doens't work (only works for the first protocol buffer) because there seems to be a number of "leftover" bytes already read from the source but not consumed via CB.take that get discarded.

And I found no way of pushing "the rest back into the source".

Did I get the concept entirely wrong?

PS: Even if I use protocol buffers here, the problem is not related to protocol buffers. To debug the problem I always use {length}{UTF8 encoded string}{length}{UTF8 encoded string}... and a conduit similar to the above one (utf8StringConduit :: MonadResource m => Conduit ByteString m Text).

Update:

I just tried to replace the state (no state () in the sample above) by the remaining bytes and replaced the CB.take calls by calls to a function that first consumes the already read bytes (from the state) and calls await only as needed (when the state is not large enough). Unfortunately, that doesn't work either because as soon as the Source has no bytes left, sequenceSink does not execute the code but the state still contains the remaining bytes :-(.

If you should be interested in the code (which isn't optimized or very good but should be enough to test):

utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
    CU.sequenceSink [] $ \st ->
        do (lengthBytes, st') <- takeWithState BS.empty st 4
           let len :: Word32
               len = B.decode $ BSL.fromChunks [lengthBytes]
               intLength = fromIntegral len
           (textBytes, st'') <- takeWithState BS.empty st' intLength
           return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]

takeWithState :: Monad m
              => ByteString
              -> [ByteString]
              -> Int
              -> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
    let stateLenSum = foldl' (+) 0 $ map BS.length state
     in if stateLenSum >= neededLen
           then do let (firstChunk:state') = state
                       (neededChunk, pushBack) = BS.splitAt neededLen firstChunk
                       acc' = acc `BS.append` neededChunk
                       neededLen' = neededLen - BS.length neededChunk
                       state'' = if BS.null pushBack
                                    then state'
                                    else pushBack:state'
                   takeWithState acc' state'' neededLen'
           else do aM <- await
                   case aM of
                     Just a -> takeWithState acc (state ++ [a]) neededLen
                     Nothing -> error "to be fixed later"
Was it helpful?

Solution

For protocol buffer parsing and serializing we use messageWithLengthPutM and messageWithLengthGetM (see below) but I assume it uses a varint encoding for the length, which is not what you need. I'd probably try to adapt our implementation below by replacing the messageWithLength Get/Put with something like

myMessageWithLengthGetM = 
   do size <- getWord32be 
      getMessageWithSize size

but I have no idea how to implement the getMessageWithSize using the available functions from the protocol buffer package. On the other hand you could just getByteString and then "reparse" the bytestring.

Regarding conduits: Have you tried implementing the conduit without Data.Conduit.Util? Something like

protobufConduit protobufDecode = loop
   where
      loop = 
         do len <- liftM convertLen (CB.take 4)
            bs <- CB.take len
            yield (protobufDecode bs)
            loop

Here's the code we use:

pbufSerialize :: (ReflectDescriptor w, Wire w) => Conduit w IO ByteString
pbufSerialize = awaitForever f
    where f pb = M.mapM_ yield $ BSL.toChunks $ runPut (messageWithLengthPutM pb)

pbufParse :: (ReflectDescriptor w, Wire w, Show w) => Conduit ByteString IO w
pbufParse = new
    where
      new = read (runGet messageWithLengthGetM . BSL.fromChunks . (:[]))
      read parse =
          do mbs <- await
             case mbs of
               Just bs -> checkResult (parse bs)
               Nothing -> return ()
      checkResult result =
          case result of
            Failed _ errmsg -> fail errmsg
            Partial cont -> read (cont . Just . BSL.fromChunks . (:[]))
            Finished rest _ msg ->
                do yield msg
                   checkResult (runGet messageWithLengthGetM rest)
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top