Question

I have heard that iteratees are lazy, but how lazy exactly are they? Alternatively, can iteratees be fused with a postprocessing function, so that an intermediate data structure does not have to be built?

Can I in my iteratee for example build a 1 million item Stream[Option[String]] from a java.io.BufferedReader, and then subsequently filter out the Nones, in a compositional way, without requiring the entire Stream to be held in memory? And at the same time guarantee that I don't blow the stack? Or something like that - it doesn't have to use a Stream.

I'm currently using Scalaz 6 but if other iteratee implementations are able to do this in a better way, I'd be interested to know.

Please provide a complete solution, including closing the BufferedReader and calling unsafePerformIO, if applicable.

Was it helpful?

Solution

Here's a quick iteratee example using the Scalaz 7 library that demonstrates the properties you're interested in: constant memory and stack usage.

The problem

First assume we've got a big text file with a string of decimal digits on each line, and we want to find all the lines that contain at least twenty zeros. We can generate some sample data like this:

val w = new java.io.PrintWriter("numbers.txt")
val r = new scala.util.Random(0)

(1 to 1000000).foreach(_ =>
  w.println((1 to 100).map(_ => r.nextInt(10)).mkString)
)

w.close()

Now we've got a file named numbers.txt. Let's open it with a BufferedReader:

val reader = new java.io.BufferedReader(new java.io.FileReader("numbers.txt"))

It's not excessively large (~97 megabytes), but it's big enough for us to see easily whether our memory use is actually staying constant while we process it.

Setting up our enumerator

First for some imports:

import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I }

And an enumerator (note that I'm changing the IoExceptionOrs into Options for the sake of convenience):

val enum = I.enumReader(reader).map(_.toOption)

Scalaz 7 doesn't currently provide a nice way to enumerate a file's lines, so we're chunking through the file one character at a time. This will of course be painfully slow, but I'm not going to worry about that here, since the goal of this demo is to show that we can process this large-ish file in constant memory and without blowing the stack. The final section of this answer gives an approach with better performance, but here we'll just split on line breaks:

val split = I.splitOn[Option[Char], List, IO](_.cata(_ != '\n', false))

And if the fact that splitOn takes a predicate that specifies where not to split confuses you, you're not alone. split is our first example of an enumeratee. We'll go ahead and wrap our enumerator in it:

val lines = split.run(enum).map(_.sequence.map(_.mkString))

Now we've got an enumerator of Option[String]s in the IO monad.

Filtering the file with an enumeratee

Next for our predicate—remember that we said we wanted lines with at least twenty zeros:

val pred = (_: String).count(_ == '0') >= 20

We can turn this into a filtering enumeratee and wrap our enumerator in that:

val filtered = I.filter[Option[String], IO](_.cata(pred, true)).run(lines)

We'll set up a simple action that just prints everything that makes it through this filter:

val printAction = (I.putStrTo[Option[String]](System.out) &= filtered).run

Of course we've not actually read anything yet. To do that we use unsafePerformIO:

printAction.unsafePerformIO()

Now we can watch the Some("0946943140969200621607610...")s slowly scroll by while our memory usage remains constant. It's slow and the error handling and output are a little clunky, but not too bad I think for about nine lines of code.

Getting output from an iteratee

That was the foreach-ish usage. We can also create an iteratee that works more like a fold—for example gathering up the elements that make it through the filter and returning them in a list. Just repeat everything above up until the printAction definition, and then write this instead:

val gatherAction = (I.consume[Option[String], IO, List] &= filtered).run

Kick that action off:

val xs: Option[List[String]] = gatherAction.unsafePerformIO().sequence

Now go get a coffee (it might need to be pretty far away). When you come back you'll either have a None (in the case of an IOException somewhere along the way) or a Some containing a list of 1,943 strings.

Complete (faster) example that automatically closes the file

To answer your question about closing the reader, here's a complete working example that's roughly equivalent to the second program above, but with an enumerator that takes responsibility for opening and closing the reader. It's also much, much faster, since it reads lines, not characters. First for imports and a couple of helper methods:

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, Either[Throwable, B]](
  action.catchLeft.map(
    r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
  )
)

def enumBuffered(r: => BufferedReader) =
  new EnumeratorT[Either[Throwable, String], IO] {
    lazy val reader = r
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(reader.readLine())).flatMap {
          case Right(null) => s.pointI
          case Right(line) => k(I.elInput(Right(line))) >>== apply[A]
          case e => k(I.elInput(e))
        }
    )
  }

And now the enumerator:

def enumFile(f: File): EnumeratorT[Either[Throwable, String], IO] =
  new EnumeratorT[Either[Throwable, String], IO] {
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(new BufferedReader(new FileReader(f)))).flatMap {
          case Right(reader) => I.iterateeT(
            enumBuffered(reader).apply(s).value.ensuring(IO(reader.close()))
          )
          case Left(e) => k(I.elInput(Left(e)))
        }
      )
  }

And we're ready to go:

val action = (
  I.consume[Either[Throwable, String], IO, List] %=
  I.filter(_.fold(_ => true, _.count(_ == '0') >= 20)) &=
  enumFile(new File("numbers.txt"))
).run

Now the reader will be closed when the processing is done.

OTHER TIPS

I should have read a little bit further... this is precisely what enumeratees are for. Enumeratees are defined in Scalaz 7 and Play 2, but not in Scalaz 6.

Enumeratees are for "vertical" composition (in the sense of "vertically integrated industry") while ordinary iteratees compose monadically in a "horizontal" way.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top