Question

I am trying to process a stream of data using scalaz-stream with an expensive operation※.

scala> :paste
// Entering paste mode (ctrl-D to finish)

    def expensive[T](x:T): T = {
      println(s"EXPENSIVE! $x")
      x
    }
    ^D
// Exiting paste mode, now interpreting.

expensive: [T](x: T)T

※Yes, yes, I know mixing in code with side-effects is bad functional programming style. The print statements are just to track the number of times expensive() gets called.)

Before passing the data to the expensive operation, I first need to split it into chunks.

scala> val chunked: Process[Task,Vector[Int]] = Process.range(0,4).chunk(2)
chunked: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await(scalaz.concurrent.Task@7ef516f3,<function1>,Emit(SeqView(...),Halt(scalaz.stream.Process$End$)),Emit(SeqView(...),Halt(scalaz.stream.Process$End$)))

scala> chunked.runLog.run
res1: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())

Then I map the expensive operation onto the stream of chunks.

scala> val processed = chunked.map(expensive)
processed: scalaz.stream.Process[scalaz.concurrent.Task,Vector[Int]] = Await(scalaz.concurrent.Task@7ef516f3,<function1>,Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)),Emit(SeqViewM(...),Halt(scalaz.stream.Process$End$)))

When I execute this, it calls expensive() the expected number of times:

scala> processed.runLog.run
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
res2: scala.collection.immutable.IndexedSeq[Vector[Int]] = Vector(Vector(0, 1), Vector(2, 3), Vector())

However, if I chain a call to zipWithIndex, expensive() gets called many more times:

>scala processed.zipWithIndex.runLog.run
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0)
EXPENSIVE! Vector(0, 1)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2)
EXPENSIVE! Vector(2, 3)
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
EXPENSIVE! Vector()
res3: scala.collection.immutable.IndexedSeq[(Vector[Int], Int)] = Vector((Vector(0, 1),0), (Vector(2, 3),1), (Vector(),2))

Is this a bug? If it is the desired behavior, can anybody explain why? If expensive() takes a long time, you can see why I would prefer the result with fewer calls.

Here is a gist with more examples: https://gist.github.com/underspecified/11279251

Was it helpful?

Solution

You're seeing this issue, which can take a number of different forms. The problem is essentially that map can see (and do stuff with) the intermediate steps that chunk is taking while it builds up its results.

This behavior may change in the future, but in the meantime there are a couple of possible workarounds. One of the simplest is to wrap your expensive function in a process and use flatMap instead of map:

chunked.flatMap(a =>
  Process.eval(Task.delay(expensive(a)))
).zipWithIndex.runLog.run

Another solution is to wrap your expensive function in an effectful channel:

def expensiveChannel[A] = Process.constant((a: A) => Task.delay(expensive(a)))

Now you can use through:

chunked.through(expensiveChannel).zipWithIndex.runLog.run

While the current behavior can be a little surprising, it's also a good reminder that you should be using the type system to help you track all the effects you care about (and long-running computation can be one of these).

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top