Question

I'm taking the Reactive programming course on Coursera and when doing one of the assignments I came across something weird. Anyways I added a few methods to the Future Companion object via this extension

implicit class FutureCompanionOps[T](val f: Future.type) extends AnyVal {

    /** Returns a future that is always completed with `value`.
     */
    def always[T](value: T): Future[T] = Future(value)

    /** Returns a future that is never completed.
     *
     *  This future may be useful when testing if timeout logic works correctly.
     */
    def never[T]: Future[T] = Promise().future


    /** Given a list of futures `fs`, returns the future holding the list of values of all the futures from `fs`.
     *  The returned future is completed only once all of the futures in `fs` have been completed.
     *  The values in the list are in the same order as corresponding futures `fs`.
     *  If any of the futures `fs` fails, the resulting future also fails.
     */
    def all[T](fs: List[Future[T]]): Future[List[T]] = {
      val resPr = Promise[List[T]]()
      def function( in: List[Future[T]], fxs:Future[List[T]] ): Future[List[T]] =
      {
        if(in.isEmpty) fxs 
        else
        function( in.tail, for { i <- in.head ; xs <- fxs } yield { i :: xs } ) 
      }
      function( fs, resPr.success(Nil).future )
    }
}

i then wrote this on a Scala WorkSheet in Eclipse

object TestSheet {

val tempPr = Promise[Boolean]()      
val anotherFuLs = List( Future.always(true), Future.always(false), tempPr.future )
                                                  //> anotherFuLs  : List[scala.concurrent.Future[Boolean]] = List(scala.concurren
                                                  //| t.impl.Promise$DefaultPromise@a19b1de, scala.concurrent.impl.Promise$Default
                                                  //| Promise@1cec6b00, scala.concurrent.impl.Promise$DefaultPromise@625dcec6)
  val crapFut = Future.all(anotherFuLs)           //> crapFut  : scala.concurrent.Future[List[Boolean]] = scala.concurrent.impl.Pr
                                                  //| omise$DefaultPromise@6564dbd5
  crapFut.isCompleted                             //> res3: Boolean = false
  tempPr.success(false)                           //> res4: nodescala.TestSheet.tempPr.type = scala.concurrent.impl.Promise$Defaul
                                                  //| tPromise@625dcec6
  crapFut.isCompleted                             //> res5: Boolean = true
  crapFut onComplete {
    case Success(ls) => println( ls )
    case Failure(e) => println( "Failed with Exception " + e )
  }
} 

no matter what I can't get the Scala Work Sheet to print out the values of the resulting List. However when I write a unit test and run scala test, I have no problem comparing the final resulting list. Is this a bug in scala worksheet when working with asynchronous stuff ?

This is the unit test

test("A composed future with all should complete when all futures complete") {
    val tempPr = Promise[Boolean]()
    val lsFu = List( Future.always(true), Future.always(false), tempPr.future );
    val fuL = Future.all( lsFu )
    fuL onComplete { case Success(ls) => println( "This got done" ); assert( ls === List( true, false, true ), "I should get back the expected List" ) 
                     case Failure(ex) => assert( false, "Failed with Exception " + ex ) } 
    assert( fuL.isCompleted === false, "The resulting Future should not be complete when the depending futures are still pending" )    
    tempPr.success(true)

  }
Was it helpful?

Solution

It looks like the problem is that the main thread that runs your worksheet code is ending before the onComplete handler gets run.

Scala's default ExecutionContext is essentially a thread pool full of daemon threads. "Daemon" in this context means that even if that thread is busy doing something, it won't prevent the JVM from shutting down when all non-daemon threads finish. In your case, the main thread is probably the only non-daemon thread in the program.

Calling onComplete on a Future will make it so that the implicitly-provided ExecutionContext will execute your handler when the Future completes. This means the handler runs on a daemon thread. Since onComplete is the last thing you do in your main method, the JVM is simply finishing before the ExecutionContext gets around to running the handler.

Normally, this isn't a big deal. In a scenario like a web server, your JVM will be up and running for a long time. For your use case, I'd recommend blocking for the Future to complete, by using one of the methods in scala.concurrent.Await. That way you can run your completion logic as a part of the main thread, in the main method.

OTHER TIPS

Intellij IDEA has a similar problem, and it applies to both the worksheet and even running an application from inside IDEA.

The key is the libraries that are on the classpath when your code is run. The scala command gets turned into something like:

execCommand /opt/jdk1.7.0_45/bin/java -Xmx256M -Xms32M -Xbootclasspath/a:
/opt/scala-2.10.3/lib/akka-actors.jar:
/opt/scala-2.10.3/lib/diffutils.jar:
/opt/scala-2.10.3/lib/jline.jar:
/opt/scala-2.10.3/lib/scala-actors.jar:
/opt/scala-2.10.3/lib/scala-actors-migration.jar:
/opt/scala-2.10.3/lib/scala-compiler.jar:
/opt/scala-2.10.3/lib/scala-library.jar:
/opt/scala-2.10.3/lib/scala-partest.jar:
/opt/scala-2.10.3/lib/scalap.jar:
/opt/scala-2.10.3/lib/scala-reflect.jar:
/opt/scala-2.10.3/lib/scala-swing.jar:
/opt/scala-2.10.3/lib/typesafe-config.jar 
-classpath "" -Dscala.home=/opt/scala-2.10.3 -Dscala.usejavacp=true scala.tools.nsc.MainGenericRunner 
-cp out/production/Sheets testing.SequenceMain 400

The problem is the IDE's do not do the equivalent of the scala command, and the net result is Await.result() does not wait for the daemon threads to complete.

Here is a concrete example (also inspired the the Reactive course): package testing

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Promise, Future}
import scala.concurrent.duration.Duration

object SequenceMain {

  def main(args: Array[String]) {
    val delay = if (args.length > 0) args(0).toInt else 100
    if (args.length > 1) {
      println("Delays are: " + (List(1.0, 1.2, 0.8) map {df => (delay * df).toInt} mkString ", "))
    }

    val start = System.currentTimeMillis()
    var last = start

    def stamp(s: String) {
      val tn = Thread.currentThread().getName
      val now = System.currentTimeMillis()
      println(s"$tn: ${now-start} / ${now - last}: $s")
      last = now
    }

    def sequence[T](fs: List[Future[T]]): Future[List[T]] = {
      val pr = Promise[List[T]]()
      pr.success(Nil)
      val r: Future[List[T]] = fs.foldRight(pr.future) {
        (ft: Future[T], z: Future[List[T]]) =>
          val result: Future[List[T]] = for (t <- ft; ts <- z) yield {
            t :: ts
          }
          result
      }
      r
    }

    stamp("Making sequence of futures.")
    val fts: List[Future[String]] = List(
      Future[String] {
        Thread.sleep((delay * 1.0).toInt)
        "Future 0"
      },
      Future[String] {
        Thread.sleep((delay * 1.2).toInt)
        if (false) throw new Exception("Blew up")
        else "Future 1"
      },
      Future[String] {
        Thread.sleep((delay * 0.8).toInt)
        "Future 2"
      }
    )


    stamp("Making Future sequence.")
    val a1: Future[List[String]] = sequence(fts)

    stamp("Extracting sequence from future.")
    a1 foreach {
      (z: List[String]) => println("And the result is : " + z)
    }

    stamp("Await result.")
    Await.result(a1, Duration(10, "seconds"))
    stamp("Awaited result.")

  }
}

Running this app inside IDEA produces:

/opt/jdk1.7.0_45/bin/java -Didea.launcher.port=7541 -Didea.launcher.bin.path=/opt/idea/idea-IU-134.1160/bin -Dfile.encoding=UTF-8 -classpath /home/mkh/IdeaProjects/Sheets/out/production/Sheets:/opt/scala-2.10.3/lib/scala-library.jar:/opt/jdk1.7.0_45/jre/lib/rt.jar:/opt/idea/idea-IU-134.1160/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain testing.SequenceMain 400
main: 0 / 0: Making sequence of futures.
main: 87 / 87: Making Future sequence.
main: 90 / 3: Extracting sequence from future.
main: 90 / 0: Await result.
main: 562 / 472: Awaited result.

Process finished with exit code 0

Note that the "And the result is" println is not printed.

However if the code is run directly, the result is:

mkh@rock:~/IdeaProjects/Sheets$ scala -cp out/production/Sheets:/opt/scala-2.10.3/lib/scala-library.jar testing.SequenceMain 400 
main: 1 / 1: Making sequence of futures.
main: 9 / 8: Making Future sequence.
main: 10 / 1: Extracting sequence from future.
main: 10 / 0: Await result.
main: 491 / 481: Awaited result.
And the result is : List(Future 0, Future 1, Future 2)

Note that the time spent waiting is a bit longer, and the println is actually completed.

Even more bizarre, a seemingly unrelated println makes this example work, even without using the scala command:

/opt/jdk1.7.0_45/bin/java -Didea.launcher.port=7539 -Didea.launcher.bin.path=/opt/idea/idea-IU-134.1160/bin -Dfile.encoding=UTF-8 -classpath /home/mkh/IdeaProjects/Sheets/out/production/Sheets:/opt/scala-2.10.3/lib/scala-library.jar:/opt/jdk1.7.0_45/jre/lib/rt.jar:/opt/idea/idea-IU-134.1160/lib/idea_rt.jar com.intellij.rt.execution.application.AppMain testing.SequenceMain 400 1
Delays are: 400, 480, 320
main: 1 / 1: Making sequence of futures.
main: 59 / 58: Making Future sequence.
main: 62 / 3: Extracting sequence from future.
main: 62 / 0: Await result.
And the result is : List(Future 0, Future 1, Future 2)
main: 543 / 481: Awaited result.

Process finished with exit code 0

The example now prints the delays as its first action (to confirm that 480 ms is needed to let the slowest Future complete), but somehow this initial println has the side effect of making the final Await work.

Someone a lot smarter than I will have to explain this last piece...

Because we cannot println the result of Future on worksheet, I suggest to write the result into a file. Then, you can use writeFile instead of println on your worksheet.

 def writeFile(text : String) =  {
  val fw = new FileWriter("result.txt", true)
  try {
    fw.write(text)
    fw.write("\n")
  }
  finally fw.close()
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top