Question

I have an actor the receives a message and runs two futures. Those futures can run in parallel, so I thought I could use a for comprehension to run both futures and combine their result into one response to the sender. I can get the results from each by themselves, but I don't know what to do to get aggregate them when they are both complete.

def receive = {
    case "pcbStatus" => {
      val currentSender = sender
      //first future
      val wsf = (self ? "workhorseStats")(5 seconds)

      val psf = Future.traverse(context.children)(x => {
        (x ? "reportStatus")(5 seconds)
      });

      val combined = for {
        r1 <- wsf
        r2 <- psf
      } yield (r1, r2)



      combined.onComplete {
        case Success(result:Any) => {
          val response = new SomeCaseClass(r1,r2)
          println("YAY: " + response)
          currentSender ! response
        }
        case Failure(failure) => {
          println("FAIL: " + failure)
        }
      }
   }
}
Was it helpful?

Solution

I've coded a little example of what I think it is you are trying to do. First, the two actor classes:

class ParentActor extends Actor{
  import context._
  import akka.pattern.pipe
  implicit val timeout = Timeout(5 seconds)      

  override def preStart = {
    context.actorOf(Props[ChildActor], "child-a")
    context.actorOf(Props[ChildActor], "child-b")
  }    

  def receive = {
    case "foo" =>
      val fut1 = (self ? "bar").mapTo[Int]
      val fut2 = Future.traverse(context.children)(child => (child ? "baz").mapTo[Int])

      val aggFut = for{
        f1 <- fut1
        f2 <- fut2
      } yield SomeResult(f1, f2.toList)

      aggFut pipeTo sender

    case "bar" =>
      sender ! 2
  }
}

class ChildActor extends Actor{
  def receive = {
    case "baz" =>
      sender ! 1
  }
}

Then you could test it with this code:

implicit val timeout = Timeout(5 seconds)

val system = ActorSystem("foo")
val actor = system.actorOf(Props[ParentActor])
val result = actor ? "foo"

import system._
result onComplete{
  case tr => println(tr)
}

When you run this, it should print Success(SomeResult(2,List(1, 1))).

A couple of things here. First, using mapTo allows the types to be known as opposed to having to deal with Any. Second, pipeTo is a good option here to avoid closing over the sender and it also simplifies the code a bit.

OTHER TIPS

There is a trivial way to combine Futures. For example (without akka):

  import scala.concurrent.ExecutionContext.Implicits.global

  val promiseInt = Promise[Int]
  val promiseString = Promise[String]

  val futureInt = promiseInt.future
  val futureString = promiseString.future

  case class Special(i: Int, s: String)

  futureInt.onSuccess { case(i) =>
    futureString.onSuccess { case(s) =>
      println(Special(i, s))
    }
  }

  promiseInt.success(3)
  promiseString.success("no")
  Thread.sleep(100)

The order in which the two futures are completed is irrelevant. You can try inverting the two success triggers and you will get the same result.

I am using Promise here only to build a running example; it has nothing to do with combining the Futures.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top