Question

Stuff I need help with is in bold.

I have an actor that is flying multiple spray HttpRequests, the requests are paginated and the actor makes sure it writes the results in sequence into a database (sequence is important to resume crawlers). I explain this because I don't want to explore other patterns of concurrency at the moment. The actor needs to recover from timeouts without restarting.

in my actor I have the following :

            case f : Failure => {
                system.log.error("faiure")
                system.log.error(s"$f")
                system.shutdown()
            }
            case f : AskTimeoutException => {
                system.log.error("faiure")
                system.log.error(s"$f")
                system.shutdown()
            }
            case msg @ _ => {

                system.log.error("Unexpected message in harvest")
                system.log.error(s"${msg}")
                system.shutdown()
            }

but I can't match correctly :

[ERROR] [11/23/2013 14:58:10.694] [Crawler-akka.actor.default-dispatcher-3] [ActorSystem(Crawler)] Unexpected message in harvest
[ERROR] [11/23/2013 14:58:10.694] [Crawler-akka.actor.default-dispatcher-3] [ActorSystem(Crawler)] Failure(akka.pattern.AskTimeoutException: Timed out)

My dispatches look as follows :

abstract class CrawlerActor extends Actor {
  private implicit val timeout: Timeout = 20.seconds
  import context._
  def dispatchRequest(node: CNode) {
    val reqFut = (System.requester ? CrawlerRequest(node,Get(node.url))).map(r=> CrawlerResponse(node,r.asInstanceOf[HttpResponse]))
    reqFut pipeTo self
  }


class CrawlerRequester extends Actor  {
  import context._
  val throttler = context.actorOf(Props(classOf[TimerBasedThrottler],System.Config.request_rate),"throttler")
  throttler ! SetTarget(Some(IO(Http).actorRef))

  def receive : Receive = {
    case CrawlerRequest(type_,request) => {
      throttler forward request
    }
  }
}

Once I find the correct way of matching, is there anyway I can get my hands on the CrawlerRequest that the timeout occurred with ? it contains some state I need to figure out how to recover.

Was it helpful?

Solution 2

Need to type out the full path of the Failure case class, (or import it I guess).

case f: akka.actor.Status.Failure => {
                system.log.error("faiure")
                system.log.error(s"${f.cause}")
                system.shutdown()
            }

That just leaves getting to the request associated with the timeout. Seems a map and pipe with a custom failure handler is needed at point request dispatch. Looking into it now.

The following trampolines the timeout into the actor.

case class CrawlerRequestTimeout(request: CrawlerRequest)
abstract class CrawlerActor extends Actor {
  private implicit val timeout: Timeout = 20.seconds
  import context._
  def dispatchRequest(node: CNode) {
    val req =  CrawlerRequest(node,Get(node.url))
    val reqFut = (System.requester ? req).map(r=> CrawlerResponse(node,r.asInstanceOf[HttpResponse]))

    reqFut onFailure {
        case te: akka.pattern.AskTimeoutException => self ! CrawlerRequestTimeout(req)
    }
    reqFut pipeTo self
  }
}

with a match of :

 case timeout : CrawlerRequestTimeout => {
                println("boom")
                system.shutdown()
            }

Need to find a way of suppressing the exception though, it's still firing. Perhaps suppression isn't really a concern, verifying.

No, suppression is a concern, or the exception trickles down to the msg @ _, need to put in a case class to absorb the redundant failure message.

ok, so getting rid of the pipeto gets rid of the exception entering the client actor. It's also a lot easier to read :D

abstract class CrawlerActor extends Actor {
  private implicit val timeout: Timeout = 20.seconds
  import context._
  def dispatchRequest(node: CNode) {
    val req =  CrawlerRequest(node,Get(node.url))
    val reqFut = (System.requester ? req)

    reqFut onFailure {
        case te: akka.pattern.AskTimeoutException => self ! CrawlerRequestTimeout(req)
    }
    reqFut onSuccess {
        case r: HttpResponse => self ! CrawlerResponse(node,r)
    }
  }
}

OTHER TIPS

This situation occurs if you use pipeTo to respond to message that sent by tell.

For example:

in actorA: actorB ! message
in actorB: message => doStuff pipeTo sender
in actorA: receives not 'scala.util.Failure', but 'akka.actor.Status.Failure'

The additional logic in pipeTo is to transform Try's Failure into akka's actor Failure (akka.actor.Status.Failure). This works fine when you use ask pattern, because temporary ask actor handle akka.actor.Status.Failure for you, but does not work well with tell.

Hope this short answer helps :)

Good luck!

If I understand correctly, you currently don't succeed in matching the AskTimeoutException.

If so, you should match case Failure(AskTimeoutException) => ... instead of case f : AskTimeoutException => ....

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top