Question

How should I handle an exception thrown by the DbActor here ? I'm not sure how to handle it, should pipe the Failure case ?

class RestActor extends Actor with ActorLogging {
  import context.dispatcher

  val dbActor = context.actorOf(Props[DbActor])
  implicit val timeout = Timeout(10 seconds)


  override val supervisorStrategy: SupervisorStrategy = {
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
      case x: Exception => ???
    }
  }

  def receive = {
    case GetRequest(reqCtx, id) => {

        // perform db ask
       ask(dbActor, ReadCommand(reqCtx, id)).mapTo[SomeObject] onComplete {
        case Success(obj) => { // some stuff }
        case Failure(err) => err match {
          case x: Exception => ???
        }
      }
    }
  }
}

Would be glad to get your thought, thanks in advance !

Was it helpful?

Solution

There are a couple of questions I can see here based on the questions in your code sample:

  1. What types of things can I do when I override the default supervisor behavior in the definition of how to handle exceptions?

  2. When using ask, what types of things can I do when I get a Failure result on the Future that I am waiting on?

Let's start with the first question first (usually a good idea). When you override the default supervisor strategy, you gain the ability to change how certain types of unhandled exceptions in the child actor are handled in regards to what to do with that failed child actor. The key word in that previous sentence is unhandled. For actors that are doing request/response, you may actually want to handle (catch) specific exceptions and return certain response types instead (or fail the upstream future, more on that later) as opposed to letting them go unhandled. When an unhandled exception happens, you basically lose the ability to respond to the sender with a description of the issue and the sender will probably then get a TimeoutException instead as their Future will never be completed. Once you figured out what you handle explicitly, then you can consider all the rest of exceptions when defining your custom supervisor strategy. Inside this block here:

OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
  case x: Exception => ???
}

You get a chance to map an exception type to a failure Directive, which defines how the failure will be handled from a supervision standpoint. The options are:

  1. Stop - Completely stop the child actor and do not send any more messages to it

  2. Resume - Resume the failed child, not restarting it thus keeping its current internal state

  3. Restart - Similar to resume, but in this case, the old instance is thrown away and a new instance is constructed and internal state is reset (preStart)

  4. Escalate - Escalate up the chain to the parent of the supervisor

So let's say that given a SQLException you wanted to resume and given all others you want to restart then your code would look like this:

OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
  case x: SQLException => Resume
  case other => Restart
}

Now for the second question which pertains to what to do when the Future itself returns a Failure response. In this case, I guess it depends on what was supposed to happen as a result of that Future. If the rest actor itself was responsible for completing the http request (let's say that httpCtx has a complete(statusCode:Int, message:String) function on it), then you could do something like this:

   ask(dbActor, ReadCommand(reqCtx, id)).mapTo[SomeObject] onComplete {
    case Success(obj) => reqCtx.complete(200, "All good!")
    case Failure(err:TimeoutException) => reqCtx.complete(500, "Request timed out")
    case Failure(ex) => reqCtx.complete(500, ex.getMessage)
  }

Now if another actor upstream was responsible for completing the http request and you needed to respond to that actor, you could do something like this:

   val origin = sender
   ask(dbActor, ReadCommand(reqCtx, id)).mapTo[SomeObject] onComplete {
    case Success(obj) => origin ! someResponseObject
    case Failure(ex) => origin ! Status.Failure(ex)
  }

This approach assumes that in the success block you first want to massage the result object before responding. If you don't want to do that and you want to defer the result handling to the sender then you could just do:

   val origin = sender
   val fut = ask(dbActor, ReadCommand(reqCtx, id))
   fut pipeTo origin

OTHER TIPS

For simpler systems one may want to catch and forward all of the errors. For that I made this small function to wrap the receive method, without bothering with supervision:

  import akka.actor.Actor.Receive
  import akka.actor.ActorContext
  /**
   * Meant for wrapping the receive method with try/catch.
   * A failed try will result in a reply to sender with the exception.
   * @example
   *          def receive:Receive = honestly {
   *            case msg => sender ! riskyCalculation(msg)
   *          }
   *          ...
   *          (honestActor ? "some message") onComplete {
   *            case e:Throwable => ...process error
   *            case r:_ => ...process result
   *          }
   * @param receive
   * @return Actor.Receive
   *
   * @author Bijou Trouvaille
   */
    def honestly(receive: =>Receive)(implicit context: ActorContext):Receive = { case msg =>
      try receive(msg) catch { case error:Throwable => context.sender ! error }
    }

you can then place it into a package file and import a la akka.pattern.pipe and such. Obviously, this won't deal with exceptions thrown by asynchronous code.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top