Question

I have an actor in Akka that will process messages to create certain entities. Some fields on these entities are computed based on the state of other entities in the database at the moment of creation.

I would like to avoid creating a race condition where the actor processing goes faster than the database is able to persist the entities. This may lead to inconsistent data, going like:

  • Actor creates a Foo and sends it to other actors for further processing and saving
  • The actor is asked to create another Foo. Since the first one is not yet saved, the new one is created based on the old content of the DB, thereby creating a wrong Foo.

Now, this possibility is quite remote, since the creation of the Foos will be triggered manually. But it is still conceivable that a double click may cause problems under high load. And who knows if tomorrow Foo will be created automatically.

Hence, what I need is some way to tell the actor to wait, and resume its operations only after confirmation that the Foos have been saved.

Is there a way to put an actor in idle state, and tell it to resume its operations after a while?

Basically, I would like to use the mailbox as a message queue, and have control over the processing speed of the queue.

Was it helpful?

Solution

No, you cannot suspend an actor: actors always pull messages from their mailbox as quickly as possible. This leaves only the possibility that incoming requests are stashed away, to be processed later:

class A(db: ActorRef) extends Actor with Stash {
  def receive = {
    case Request =>
      doWork()
      db ! Persist
      context.setReceiveTimeout(5.seconds)
      context.become({
        case Request        => stash()
        case Persisted      => context.unbecome(); unstashAll()
        case ReceiveTimeout => throw new TimeoutException("not persisted")
      }, discardOld = false)
  }
}

Please note that message delivery is not guaranteed (or the database may be down) and therefore the timeout is recommended practice.

The underlying problem

This problem shows up mostly in those cases which are not well aligned between the actor model and the domain model: the actor is the unit of consistency, but in your use-case your consistent image requires an up-to-date external entity (the database) so that the actor does the right thing. I cannot recommend a solution without knowing more about the use-case, but try to remodel your problem taking this into account.

OTHER TIPS

It turns out that this only requires a few lines. This is the solution I came up with, which agrees with pagoda_5b suggestion:

class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
  import QueueingActor._

  def receive = {
    case message =>
      context.become({
        case Resume =>
          unstashAll()
          context.unbecome()
        case _ => stash()
      })
      nextActor ! message
  }
}

object QueueingActor {
  case class Resume()
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top