Question

I have a directory full of CSV dump files that I need to parse and process. Each file name contains a timestamp that I know how to extract. Then I need to process all lines from dumps, but for each line I need to know what was the file timestamp it came from. I can get the timestamps separately, and I can source all lines from files separately (following How do I implement `cat` in Haskell?) but cannot find a way to combine them. Any ideas?

{-# LANGUAGE OverloadedStrings #-}
module Main (main) where

import Control.Monad.Trans.Resource (MonadResource, runResourceT)
import Data.Conduit (($=), ($$))
import qualified Data.Conduit as C (Conduit, awaitForever)
import qualified Data.Conduit.Binary as C (sinkHandle, sourceFile)
import qualified Data.Conduit.Combinators as C (map, sourceDirectory, unlines)
import qualified Data.Conduit.Text as C (decode, encode, utf8)
import Data.Text (Text, pack)
import Data.Time (LocalTime)
import Filesystem.Path.CurrentOS (FilePath)
import System.IO (stdout)
import Prelude hiding (FilePath)

decodeFilePath :: FilePath -> Text
decodeFilePath = undefined

decodeFilePathToString :: FilePath -> String
decodeFilePathToString = undefined

extractTimestamp :: Text -> LocalTime
extractTimestamp = undefined

readFileConduit :: MonadResource m => C.Conduit FilePath m Text
readFileConduit =
  C.awaitForever (\fp -> C.sourceFile (decodeFilePathToString fp) $= C.decode C.utf8)

readFileWithTimestampConduit :: MonadResource m => C.Conduit FilePath m (LocalTime, Text)
readFileWithTImestampConduit = ???

main :: IO ()
main = do
  runResourceT $
    C.sourceDirectory "data/dumps" $=
    C.map (pack . show . extractTimestamp . decodeFilePath) $=
    C.unlines $=
    C.encode C.utf8 $$
    C.sinkHandle stdout
  runResourceT $
    C.sourceDirectory "data/dumps" $=
    readFileConduit $=
    C.unlines $=
    C.encode C.utf8 $$
    C.sinkHandle stdout

EDIT: Thanks to acomar, I have this solution:

readFileWithTimestampConduit :: MonadResource m => C.Conduit FilePath m (LocalTime, Text)
readFileWithTimestampConduit =
  C.awaitForever (\fp ->
    C.sourceFile (decodeFilePathToString fp) $=
    C.decode C.utf8 $=
    C.linesUnbounded $=
    C.map (\t -> (extractTimestamp $ decodeFilePath fp, t)))

Can anybody think of an approach involving zipping up one conduit containing timestamps with another conduit containing lines from files? That is what I was trying to do before I asked this question.

Was it helpful?

Solution

Answer to the updated question:

That's not going to work very well because of how Conduits are set up. Notice that the incoming type for a conduit is fixed to receive a single value:

MonadResource m => C.Conduit i m o

If you want to take multiple inputs, you have to request them as a pair

MonadResource m => C.Conduit (i1, i2) m o

But to give that conduit its input, you've already written your existing solution!

On the other hand, if there was a way to make Conduits into Arrows, you could do this without much difficulty with the (***) function that does exactly what you want -- namely, take two input arrows and combine them to produce an arrow that acts on the pair. My understanding is that there isn't a good way to turn Conduits into Arrows. However, if you downgrade readFileConduit to readFile (working with a Handle and hGetContents) you can compose extractTimeStamp and readFile inside the IO monad.

do let timestamp = extractTimeStamp <..args..>
   contents <- readFile <..args..>
   return (timestamp, contents)

You can then lift that function into a conduit with a simple lift.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top