Frage

I have an Actor that receives metrics datapoints and periodically aggregates and persists them to disk. This latter operation does I/O, so i don't want to use a blocking operation. But if i switch it to asynchronous, how do i prevent other datapoints from being received before the aggregation is completed without blocking somewhere.

One pattern I've seen is to use Stash, something like this:

class Aggregator extends Actor with Stash {
  def receive = processing

  def processing: Receive = {
    case "aggregate" => {
      context.become(aggregating)
      aggregate().onComplete {
        case Success => self ! "aggregated"
        case Failure => self ! "aggregated"
      }
    }
    case msg => ??? // Process task
  }

  def aggregating: Receive = {
    case "aggregated" =>
      unstashAll()
      context.become(processing)
    case msg =>
      stash()
  }
}

The misgiving i have with this is that the completion of my aggregate action is simply a message anyone could send. As i understand, i cannot effect the "unbecoming" from within my Future's completion.

As a side note, I have not been able to determine whether completions like onComplete are somehow executed by the same dispatcher as receive, since if they are not, completions would break the single-threaded protections that actors otherwise offer.

Or is there a better pattern for completing actions that are not synchronous and immediate inside of receive while guaranteeing that my state cannot be altered until i complete? It seems this scenario anytime actor state deals with I/O of any kind (like a DB) and clearly you would want to avoid synchronous I/O if you can.

War es hilfreich?

Lösung

Your aggregator actor is currently doing two things: aggregating and storing. You can both solve your issue and simplify your system by splitting these two tasks. The single-responsibility-principle also applies to actors.

I'd create a dedicated actor for writing and a message class for holding the aggregated data. This actor sub-system should look like this: Aggregator-Store actor subsystem

Ideally, the time it takes to write to disk is shorter than the aggregation interval, such that your system remains stable. In case of spikes, the DataStore actor's queue will server as buffer for messages to be written to storage.

Depending on your application, you might need to implement some form of ack & retries in case you want to ensure that aggregated data has been written.

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top