Question

I'm trying to read some irregular input (for example, a commands, that can appear from time to time) from file. E.g. initially source file is empty, and my program was started. Then a some string was appended to the file, and my program must read this string.

A first naive implementation:

import System.IO
import Control.Monad

listen :: Handle -> IO ()
listen file = forever $ do
    ineof <- hIsEOF file
    if ineof
        then do
            s <- hGetLine file
            putStrLn s
        else
            return ()

But it's not working properly of course (because of a performance issues first of all). How can I implement this correctly (maybe with a conduits usage)?

Was it helpful?

Solution

I've put together an example of implementing this below. The basic idea is:

  • Monitor for file changes using the fsnotify package.
  • Use sourceFileRange to stream the previously unconsumed portions of the file.
  • Use an MVar to let the fsnotify callback signal the Source to continue reading.

This assumes that the source file is only ever added to, never delete or shortened.

import           Control.Concurrent        (forkIO, threadDelay)
import           Control.Concurrent.MVar   (MVar, newEmptyMVar, putMVar,
                                            takeMVar)
import           Control.Exception         (IOException, try)
import           Control.Monad             (forever, void, when)
import           Control.Monad.IO.Class    (liftIO)
import           Data.ByteString           (ByteString)
import qualified Data.ByteString           as S
import           Data.Conduit              (MonadResource, Source, bracketP,
                                            runResourceT, ($$), ($=))
import           Data.Conduit.Binary       (sourceFileRange)
import qualified Data.Conduit.List         as CL
import           Data.IORef                (IORef, modifyIORef, newIORef,
                                            readIORef)
import           Data.Time                 (getCurrentTime)
import           Filesystem                (canonicalizePath)
import           Filesystem.Path.CurrentOS (decodeString, directory)
import           System.FSNotify           (Event (..), startManager,
                                            stopManager, watchDir)

tryIO :: IO a -> IO (Either IOException a)
tryIO = try

sourceFileForever :: MonadResource m => FilePath -> Source m ByteString
sourceFileForever fp' = bracketP startManager stopManager $ \manager -> do
    fp <- liftIO $ canonicalizePath $ decodeString fp'
    baton <- liftIO newEmptyMVar
    liftIO $ watchDir manager (directory fp) (const True) $ \event -> void $ tryIO $ do
        fpE <- canonicalizePath $
            case event of
                Added x _ -> x
                Modified x _ -> x
                Removed x _ -> x
        when (fpE == fp) $ putMVar baton ()
    consumedRef <- liftIO $ newIORef 0
    loop baton consumedRef
  where
    loop :: MonadResource m => MVar () -> IORef Integer -> Source m ByteString
    loop baton consumedRef = forever $ do
        consumed <- liftIO $ readIORef consumedRef
        sourceFileRange fp' (Just consumed) Nothing $= CL.iterM counter
        liftIO $ takeMVar baton
      where
        counter bs = liftIO $ modifyIORef consumedRef (+ fromIntegral (S.length bs))

main :: IO ()
main = do
    let fp = "foo.txt"
    writeFile fp "Hello World!"
    _ <- forkIO $ runResourceT $ sourceFileForever fp $$ CL.mapM_ (liftIO . print)
    forever $ do
        now <- getCurrentTime
        appendFile fp $ show now ++ "\n"
        threadDelay 1000000
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top