Question

I'm trying to write an enumerator for reading files line by line from a java.io.BufferedReader using Scalaz 7's iteratee library, which currently only provides an (extremely slow) enumerator for java.io.Reader.

The problems I'm running into are related to the fact that all of the other iteratee libraries I've used (e.g. Play 2.0's and John Millikin's enumerator for Haskell) have had an error state as one of their Step type's constructors, and Scalaz 7 doesn't.

My current implementation

Here's what I currently have. First for some imports and IO wrappers:

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I, _ }

def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
def readLine(r: BufferedReader) = IO(Option(r.readLine))
def closeReader(r: BufferedReader) = IO(r.close())

And an type alias to clean things up a bit:

type ErrorOr[A] = Either[Throwable, A]

And now a tryIO helper, modeled (loosely, and probably wrongly) on the one in enumerator:

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, ErrorOr[B]](
  action.catchLeft.map(
    r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
  )
)

An enumerator for the BufferedReader itself:

def enumBuffered(r: => BufferedReader) = new EnumeratorT[ErrorOr[String], IO] {
  lazy val reader = r
  def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
    tryIO(readLine(reader)) flatMap {
      case Right(None)       => s.pointI
      case Right(Some(line)) => k(I.elInput(Right(line))) >>== apply[A]
      case Left(e)           => k(I.elInput(Left(e)))
    }
  )
}

And finally an enumerator that's responsible for opening and closing the reader:

def enumFile(f: File) = new EnumeratorT[ErrorOr[String], IO] {
  def apply[A] = (s: StepT[ErrorOr[String], IO, A]) => s.mapCont(k =>
    tryIO(openFile(f)) flatMap {
      case Right(reader) => I.iterateeT(
        enumBuffered(reader).apply(s).value.ensuring(closeReader(reader))
      )
      case Left(e) => k(I.elInput(Left(e)))
    }
  )
}

Now suppose for example that I want to collect all the lines in a file that contain at least twenty-five '0' characters into a list. I can write:

val action: IO[ErrorOr[List[String]]] = (
  I.consume[ErrorOr[String], IO, List] %=
  I.filter(_.fold(_ => true, _.count(_ == '0') >= 25)) &=
  enumFile(new File("big.txt"))
).run.map(_.sequence)

In many ways this seems to work beautifully: I can kick the action off with unsafePerformIO and it will chunk through tens of millions of lines and gigabytes of data in a couple of minutes, in constant memory and without blowing the stack, and then close the reader when it's done. If I give it the name of a file that doesn't exist, it will dutifully give me back the exception wrapped in a Left, and enumBuffered at least seems to behave appropriately if it hits an exception while reading.

Potential problems

I have some concerns about my implementation, though—particularly of tryIO. For example, suppose I try to compose a few iteratees:

val it = for {
  _ <- tryIO[Unit, Unit](IO(println("a")))
  _ <- tryIO[Unit, Unit](IO(throw new Exception("!")))
  r <- tryIO[Unit, Unit](IO(println("b")))
} yield r

If I run this, I get the following:

scala> it.run.unsafePerformIO()
a
b
res11: ErrorOr[Unit] = Right(())

If I try the same thing with enumerator in GHCi, the result is more like what I'd expect:

...> run $ tryIO (putStrLn "a") >> tryIO (error "!") >> tryIO (putStrLn "b")
a
Left !

I just don't see a way to get this behavior without an error state in the iteratee library itself.

My questions

I don't claim to be any kind of expert on iteratees, but I have used the various Haskell implementations in a few projects, feel like I more or less understand the fundamental concepts, and had coffee with Oleg once. I'm at a loss here, though. Is this a reasonable way to handle exceptions in the absence of an error state? Is there a way to implement tryIO that would behave more like the enumerator version? Is there some kind of time bomb waiting for me in the fact that my implementation behaves differently?

Was it helpful?

Solution

EDIT here is the real solution. I left in the original post because I think its worthwhile seeing the pattern. What works for Klesli works for IterateeT

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }

object IterateeIOExample {
  type ErrorOr[+A] = EitherT[IO, Throwable, A]

  def openFile(f: File) = IO(new BufferedReader(new FileReader(f)))
  def readLine(r: BufferedReader) = IO(Option(r.readLine))
  def closeReader(r: BufferedReader) = IO(r.close())

  def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] {
    EitherT.fromEither(action.catchLeft).map(r => I.sdone(r, I.emptyInput))
  }

  def enumBuffered(r: => BufferedReader) = new EnumeratorT[String, ErrorOr] {
    lazy val reader = r
    def apply[A] = (s: StepT[String, ErrorOr, A]) => s.mapCont(k =>
      tryIO(readLine(reader)) flatMap {
        case None => s.pointI
        case Some(line) => k(I.elInput(line)) >>== apply[A]
      })
  }

  def enumFile(f: File) = new EnumeratorT[String, ErrorOr] {
    def apply[A] = (s: StepT[String, ErrorOr, A]) => 
      tryIO(openFile(f)).flatMap(reader => I.iterateeT[String, ErrorOr, A](
        EitherT(
          enumBuffered(reader).apply(s).value.run.ensuring(closeReader(reader)))))
  }

  def main(args: Array[String]) {
    val action = (
      I.consume[String, ErrorOr, List] %=
      I.filter(a => a.count(_ == '0') >= 25) &=
      enumFile(new File(args(0)))).run.run

    println(action.unsafePerformIO().map(_.size))
  }
}

===== Original Post =====

I feel like you need an EitherT in the mix. Without EitherT you are just ending up with a 3 Lefts or Rights. With EitherT it would propergate the left.

I think what you really want is

type ErrorOr[+A] = EitherT[IO, Throwable, A] 
I.iterateeT[A, ErrorOr, B]

The following code mimics how you are currently composing things. Because IterateeT has no concept of left and right, when you compose it, you just end up with a bunch of IO/Id's.

scala> Kleisli((a:Int) => 4.right[String].point[Id])
res11: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.\/[String,Int]] = scalaz.KleisliFunctions$$anon$18@73e771ca

scala> Kleisli((a:Int) => "aa".left[Int].point[Id])
res12: scalaz.Kleisli[scalaz.Scalaz.Id,Int,scalaz.\/[String,Int]] = scalaz.KleisliFunctions$$anon$18@be41b41

scala> for { a <- res11; b <- res12 } yield (a,b)
res15: scalaz.Kleisli[scalaz.Scalaz.Id,Int,(scalaz.\/[String,Int], scalaz.\/[String,Int])] = scalaz.KleisliFunctions$$anon$18@42fd1445

scala> res15.run(1)
res16: (scalaz.\/[String,Int], scalaz.\/[String,Int]) = (\/-(4),-\/(aa))

In the following code, instead of using Id, we use an EitherT. Since EitherT has the same bind behaviour as Either, we end up with what we want.

scala>  type ErrorOr[+A] = EitherT[Id, String, A]
defined type alias ErrorOr

scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT(4.right[String].point[Id]))
res22: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@58b547a0

scala> Kleisli[ErrorOr, Int, Int]((a:Int) => EitherT("aa".left[Int].point[Id]))
res24: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@342f2ceb

scala> for { a <- res22; b <- res24 } yield 2
res25: scalaz.Kleisli[ErrorOr,Int,Int] = scalaz.KleisliFunctions$$anon$18@204eab31

scala> res25.run(2).run
res26: scalaz.Scalaz.Id[scalaz.\/[String,Int]] = -\/(aa)

You can replace Keisli with IterateeT and Id with IO to get what you need.

OTHER TIPS

The way pipes does it is to type-class composition using the Channel type class:

class Channel p where
    {-| 'idT' acts like a \'T\'ransparent proxy, passing all requests further
        upstream, and passing all responses further downstream. -}
    idT :: (Monad m) => a' -> p a' a a' a m r

    {-| Compose two proxies, satisfying all requests from downstream with
        responses from upstream. -}
    (>->) :: (Monad m)
          => (b' -> p a' a b' b m r)
          -> (c' -> p b' b c' c m r)
          -> (c' -> p a' a c' c m r)
    p1 >-> p2 = p2 <-< p1

... and derived a lifted composition over EitherT from the base composition. This is a special case of the the principle of proxy transformers, introduced in pipes-2.4, that allows lifting composition over arbitrary extensions.

This lifting requires defining an EitherT specialized to the shape of the Proxy type in Control.Proxy.Trans.Either:

newtype EitherP e p a' a b' b (m :: * -> *) r
  = EitherP { runEitherP :: p a' a b' b m (Either e r) }

This specialization to the Proxy shape is necessary in order to be able to define a well-typed instance of the Channel class. Scala might be more flexible in this regard than Haskell.

Then I just redefine the Monad instance (and other instances) along with all the ordinary EitherT operations for this specialized type:

throw :: (Monad (p a' a b' b m)) => e -> EitherP e p a' a b' b m r
throw = EitherP . return . Left

catch
 :: (Monad (p a' a b' b m))
 => EitherP e p a' a b' b m r        -- ^ Original computation
 -> (e -> EitherP f p a' a b' b m r) -- ^ Handler
 -> EitherP f p a' a b' b m r        -- ^ Handled computation
catch m f = EitherP $ do
    e <- runEitherP m
    runEitherP $ case e of
        Left  l -> f     l
        Right r -> right r

With this in hand I can then define the following lifted composition instance:

-- Given that 'p' is composable, so is 'EitherP e p'
instance (Channel p) => Channel (EitherP e p) where
    idT = EitherP . idT
    p1 >-> p2 = (EitherP .) $ runEitherP . p1 >-> runEitherP . p2

To understand what is going on there, just follow the types:

p1 :: b' -> EitherP e p a' a b' b m r
p2 :: c' -> EitherP e p b' b c' c m r

runEitherP . p1 :: b' -> p a' a b' b m (Either e r)
runEitherP . p2 :: c' -> p b' b c' c m (Either e r)

-- Use the base composition for 'p'
runEitherP . p1 >-> runEitherP . p2
 :: c' -> p a' a c' c m (Either e r)

-- Rewrap in EitherP
(EitherP . ) $ runEitherP . p1 >-> runEitherP . p2
 :: c' -> EitherP e p a' a c' c m r

This lets you throw and catch errors within a particular stage without interrupting other stages. Here's an example I've copied and pasted from my pipes-2.4 announcement post:

import Control.Monad (forever)
import Control.Monad.Trans (lift)
import Control.Proxy
import Control.Proxy.Trans.Either as E
import Safe (readMay)

promptInts :: () -> EitherP String Proxy C () () Int IO r
promptInts () = recover $ forever $ do
    str <- lift getLine
    case readMay str of
        Nothing -> E.throw "Could not parse an integer"
        Just n  -> liftP $ respond n

recover p =
    p `E.catch` (\str -> lift (putStrLn str) >> recover p)

main = runProxy $ runEitherK $ mapP printD <-< promptInts

Here's the result:

>>> main
1<Enter>
1
Test<Enter>
Could not parse an integer
Apple<Enter>
Could not parse an integer
5<Enter>
5

The answer to the iteratee approach is similar. You must take your existing way of composing iteratees and lift it over EitherT. Whether or not you use type-classes or just define a new composition operator is up to you.

Some other useful links:

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top