Question

I'm trying to make a large number of external service calls, each followed up with exception handling and conditional further processing. I thought it would be easy to extend this nice (Asynchronous IO in Scala with futures) example using an .onComplete inside, but it appears that I don't understand something about scoping and/or Futures. Can anyone point me in the right direction please?

#!/bin/bash
scala -feature $0 $@
exit
!#

import scala.concurrent.{future, blocking, Future, Await}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Success, Failure}
import scala.language.postfixOps

val keylist = List("key1", "key2")

val myFuts: List[Future[String]] = keylist.map {
  myid => future {
    // this line simulates an external call which returns a future (retrieval from S3)
    val myfut = future { Thread.sleep(1); "START " + myid}

    var mystr = "This should have been overwritten"
    myfut.onComplete {
      case Failure(ex) => {
        println (s"failed with error: $ex")
        mystr = "FAILED"
      }
      case Success(myval) => {
        mystr = s"SUCCESS $myid: $myval"
        println (mystr)
      }
    }
    mystr
  }
}

val futset: Future[List[String]] = Future.sequence(myFuts)
println (Await.result(futset, 10 seconds))

on my computer (Scala 2.10.4), this prints:

SUCCESS key2: START key2
SUCCESS key1: START key1
List(This should have been overwritten, This should have been overwritten)

I want (order unimportant):

SUCCESS key2: START key2
SUCCESS key1: START key1
List(SUCCESS key2: START key2, SUCCESS key1: START key1)
Was it helpful?

Solution

I would avoid using onComplete and trying to use it to do side-effecting logic on a mutable variable. I would instead map the future and handle the fail case as returning a different value. Here's a slightly modified version of your code, using map on the Future (via a for comprehension) and then using recover to handle the failure case. Hopefully this is what you were looking for:

val keylist = List("key1", "key2")

val myFuts: List[Future[String]] = keylist.map {myid => 

  // this line simulates an external call which returns a future (retrieval from S3)
  val myfut = future { Thread.sleep(1); "START " + myid}
  val result = for (myval <- myfut) yield {
    val res = s"SUCCESS $myid: $myval"
    println(res)
    res
  }
  result.recover{
    case ex => 
      println (s"failed with error: $ex")
      "FAILED"          
  }

}

val futset: Future[List[String]] = Future.sequence(myFuts)
println (Await.result(futset, 10 seconds))

OTHER TIPS

On complete does not return a new future, it just allows you to do something when that future is completed. So your first Future block is returning before the onComplete is executed, thus you are getting back the original value of the string.

What we can do is use a promise, to return another future, and that future is completed by the result of the first future.

  val keylist = List("key1", "key2")

  val myFuts: List[Future[String]] = keylist.map {
    myid => {
      // this line simulates an external call which returns a future (retrieval from S3)
      val myfut = Future {
        Thread.sleep(1); "START " + myid
      }
      var mystr = "This should have been overwritten"
      val p = Promise[String]()
      myfut.onComplete {
        case Failure(ex) =>
          println(s"failed with error: $ex")
          mystr = "FAILED"
          p failure ex
        case Success(myval) =>
          mystr = s"SUCCESS $myid: $myval"
          println(mystr)
          p success myval
      }
      p.future
    }
  }

  val futset: Future[List[String]] = Future.sequence(myFuts)
  println(Await.result(futset, 10 seconds))

What would be super handy would be a mapAll method as I asked about here: Map a Future for both Success and Failure

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top