Question

I'm using aeson / attoparsec and conduit / conduit-http connected by conduit-attoparsec to parse JSON data from a file / webserver. My problem is that my pipeline always throws this exception...

ParseError {errorContexts = ["demandInput"], errorMessage = "not enough bytes", errorPosition = 1:1}

...once the socket closes or we hit EOF. Parsing and passing on the resulting data structures through the pipeline etc. works just fine, but it always ends with the sinkParser throwing this exception. I invoke it like this...

j <- CA.sinkParser json

...inside of my conduit that parses ByteStrings into my message structures.

How can I have it just exit the pipeline cleanly once there is no more data (no more top-level expressions)? Is there any decent way to detect / distinguish this exception without having to look at error strings?

Thanks!

EDIT: Example:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Applicative
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import qualified Data.Conduit.Attoparsec as CA
import Data.Aeson
import Data.Conduit
import Data.Conduit.Binary
import Control.Monad.IO.Class

data MyMessage = MyMessage String deriving (Show)

parseMessage :: (MonadIO m, MonadResource m) => Conduit B.ByteString m B.ByteString
parseMessage = do
    j <- CA.sinkParser json
    let msg = fromJSON j :: Result MyMessage
    yield $ case msg of
        Success r -> B8.pack $ show r
        Error   s -> error s
    parseMessage

main :: IO ()
main =
    runResourceT $ do
        sourceFile "./input.json" $$ parseMessage =$ sinkFile "./out.txt"

instance FromJSON MyMessage where
    parseJSON j =
        case j of
        (Object o) -> MyMessage <$> o .: "text"
        _          -> fail $ "Expected Object - " ++ show j

Sample input (input.json):

{"text":"abc"}
{"text":"123"}

Outputs:

out: ParseError {errorContexts = ["demandInput"], errorMessage = "not enough bytes", errorPosition = 3:1}

and out.txt:

MyMessage "abc"MyMessage "123"
Was it helpful?

Solution

This is a perfect use case for conduitParserEither:

parseMessage :: (MonadIO m, MonadResource m) => Conduit B.ByteString m B.ByteString
parseMessage =
    CA.conduitParserEither json =$= awaitForever go
  where
    go (Left s) = error $ show s
    go (Right (_, msg)) = yield $ B8.pack $ show msg ++ "\n"

If you're on FP Haskell Center, you can clone my solution into the IDE.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top