Question

I have a supervising Akka actor which uses a router to forward messages to worker actors.

I have a class which wraps the supervisor and when I call a method on that class it "asks" the supervisor to do something and then I use Await.result(theFuture) to wait for the result (I cannot continue without the result).

If the workers throw an exception, I want to restart the worker which threw the exception, and I want the exception to be caught by the code which calls the wrapper class.

I passed a OneForOneStrategy to the router constructor, which returns RESTART in the case of an Exception. In the postRestart method of the worker, I log the restart, so I can validate that the worker is actually restarted.

When the worker throws an exception, it gets restarted, but the exception disappears. The Future which is the result of asking the supervisor, contains an exception, but it is a akka.pattern.AskTimeoutException, which is thrown after just 5 seconds rather than 20 seconds, which is the implicit timeout that I have lingering around. The exception actually occurs less than a second after the worker starts.

Question 1: how can I get the exception from the worker in the code which calls my wrapper class?

Also, the receive method of the worker is like this:

def receive = {
    case r: Request => 
        val response = ??? //throws an exception sometimes
        sender ! response
}

Something is logging the exception to the console, but it isn't my code. The stack trace is:

[ERROR] [02/11/2013 21:34:20.093] [MySystem-akka.actor.default-dispatcher-9]  
[akka://MySystem/user/MySupervisor/MyRouter/$a] Something went wrong!
    at myApp.Worker.$$anonfun$receive$1.applyOrElse(Source.scala:169)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:425)
at akka.actor.ActorCell.invoke(ActorCell.scala:386)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:230)
at akka.dispatch.Mailbox.run(Mailbox.scala:212)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:502)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

Line 169 of Source.scala is the line val response = ??? shown in the listing of the receive method above.

Question 2: who is logging that exception to the console, and how can I stop it?

Was it helpful?

Solution

1)

try somethingThatCanFail() catch {
    case e: Exception => sender ! Status.Failure(e); throw e
}

The "tell failure" causes the caller to get a Failure containing the exception. Throwing "e" causes the oneForOneStrategy to be called which restarts the worker.

2)

It is the actor system itself that logs the failure, and the only way to quiet it down is to filter out things by creating and configuring your own LoggingAdapter as described here http://doc.akka.io/docs/akka/2.1.0/scala/logging.html There is a ticket for changing this https://www.assembla.com/spaces/akka/tickets/2824 but it is targeted for Akka 2.2

Answered by https://groups.google.com/forum/#!topic/akka-user/fenCvYu3HYE

OTHER TIPS

In order to be notified of one of your children failing, you need to

  • First watch the child
  • Then you will be sent a Terminated() when the actor dies with a reference to it.

Something like:

class ParentActor extends Actor {
   // this is sample of how to watch for death of one of your children 
   val childActor = context.actorOf(Props[SomeService], "SomeService")
   val dyingChild = context.watch(context.actorOf(childActor))

   def receive = {
     case Terminated(`dyingChild`) =>
        println("dyingChild died") 
     case Terminated(terminatedActor) => 
        println(s"This child just died $terminatedActor")
   }
 }

Hope this helps.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top