Question

I'm trying to use the scalaz iteratee package to process a large zip file in constant space. I have a long-running process I need to perform on each file in the zip file. Those processes can (and should) be run in parallel.

I created an EnumeratorT that inflates each ZipEntry into a File object. The signature looks like:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]

I want to attach an IterateeT that will perform the long-running process on each file. I basically end up with something like:

type IOE[A] = IoExceptionOr[A]

def action(f:File):IO[List[Promise[IOE[File]]]] = (
  consume[Promise[IOE[File]], IO, List] %=
  map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
  Promise { Thread.sleep(5000); iof }

When I try to run it:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get

I get a java.lang.OutOfMemoryError: Java heap space message. That makes sense to me, since it's trying to build up a massive list in memory of all these IO and Promise objects.

A few questions:

  • Does anyone have any ideas on how to avoid this? It feels like I'm approaching the problem incorrectly, because I really only care about the longRunningProcess for its side-effects.
  • Is the Enumerator approach here the wrong approach?

I'm pretty much out of ideas, so anything will help.

Thanks!

Update #1

Here is the stack trace:

[error] java.lang.OutOfMemoryError: Java heap space
[error]         at scalaz.Free.flatMap(Free.scala:46)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)

I am currently taking the advice of nadavwr to ensure everything is acting like I think it is. I will report back any updates.

Update #2

Using ideas from both the answers below, I found a decent solution. As huynhjl suggested (and I verified using nadavwr's suggestion of analyzing the heap dump), consume was causing every inflated ZipEntry to be held in memory, which is why the process was running out of memory. I changed consume to foldM and updated the long-running process to just return a Promise[IOE[Unit]] instead of a reference to the file. That way I have a collection of all IoExceptions at the end. Here is the working solution:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
  foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
  map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
  Promise { Thread.sleep(5000); iof.map(println) }

This solution inflates each entry while asynchronously uploading them. At the end, I have a huge list of fulfilled Promise objects that contain any errors. I still not fully convinced this is the correct use of an Iteratee, but I do now have several reusable, composeable pieces that I can use in other pieces of our system (this is a very common pattern for us).

Thanks for all your help!

Was it helpful?

Solution

Don't use consume. See my other recent answer: How to use IO with Scalaz7 Iteratees without overflowing the stack?

foldM may be a better choice.

Also try to map the file to something else (like a success return code) to see if that allows the JVM to garbage collect the inflated zip entries.

OTHER TIPS

How expensive (in terms of memory is your longRunningProcess? How about file deflation? Are they being executed the number of times you expect them to be? (a simple counter would be helpful)

A stack trace will be helpful to determine the straw that broke the camel's back -- sometimes that's the culprit.

If you want to be certain what's taking up so much memory, you can use the -XX:+HeapDumpOnOutOfMemoryError JVM argument and then analyze it with VisualVM, Eclipse MAT, or other heap analyzers.

Followup

It does seem strange to me that you are enumerating promises. It's counterintuitive to kick off a computation independent of both the enumerator and the iteratee. An iteratee-based solution might be better served by an enumerator that returns 'inert' elements instead of promises. Unfortunately, that would make your handling of individual files serial, but that's iteratees for ya -- non-blocking stream processing.

An actor-based solution would fit better IMHO, but both actors and iteratees (especially the latter) seem overkill for what you are trying to accomplish (at least the parts you are sharing).

Please consider plain futures/promises from Scala 2.10's scala.concurrent package, and be sure to take a look at Scala's parallel collections as well. I wouldn't introduce additional concepts into the code before these prove insufficient. Try defining a fixed-size ExecutionContext for constraining your parallelism.

I started out the answer after a quick read through, and somehow had 'stack overflow' stuck in my mind instead of 'out of memory error' ... Must be the URL :-)

Still, functional computations that rely on recursions are susceptible to stack overflows, so I've left the answer in place for any body stumbling across, and promise to try to come up with a more relevant answer.

If what you got was a stack overflow, you'd be needing a 'trampoline', a construct that boosts your computation out of the stack between recursions.

See section titled "Stackless Scala with Free Monads" in Learning Scalaz Day 18, part of @eed3si9n's excellent series of posts.

See also this gist by @mpilquist, demonstrating a trampolined iteratee.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top