Pergunta

I have an actor using Akka which performs an action that takes some time to complete, because it has to download a file from the network.

  def receive = {
    case songId: String => {
      Future {
        val futureFile = downloadFile(songId)

        for (file <- futureFile) {
          val fileName = doSomenthingWith(file)
          otherActor ! fileName
        }
      }
    }
  }

I would like to control the flow of messages to this actor. If I try to download too many files simultaneously, I have a network bottleneck. The problem is that I am using a Future inside the actor receive, so, the methods exits and the actor is ready to process a new message. If I remove the Future, I will download only one file per time.

What is the best way to limit the number of messages being processed per unit of time? Is there a better way to design this code?

Foi útil?

Solução 3

There is a contrib implementation of message Throttling, as described here.

The code is very simple:

// A simple actor that prints whatever it receives
class Printer extends Actor {
  def receive = {
    case x => println(x)
  }
}

val printer = system.actorOf(Props[Printer], "printer")

// The throttler for this example, setting the rate
val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3 msgsPer 1.second))

// Set the target
throttler ! SetTarget(Some(printer))
// These three messages will be sent to the printer immediately
throttler ! "1"
throttler ! "2"
throttler ! "3"
// These two will wait at least until 1 second has passed
throttler ! "4"
throttler ! "5"

Outras dicas

There is a contrib project for Akka that provides a throttle implementation (http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2). If you sit this in front of the actual download actor then you can effectively throttle the rate of messages going into that actor. It's not 100% perfect in that if the download times are taking longer than expected you could still end up with more downloads then might be desired, but it's a pretty simple implementation and we use it quite a bit to great effect.

Another option could be to use a pool of download actors and remove the future and allow the actors to perform this blocking so that they are truly handling only one message at a time. Because you are going to let them block, I would suggest giving them their own Dispatcher (ExecutionContext) so that this blocking does not negatively effect the main Akka Dispatcher. If you do this, then the pool size itself represents your max allowed number of simultaneous downloads.

Both of these solutions are pretty much "out-of-the-box" solutions that don't require much custom logic to support your use case.

Edit

I also thought it would be good to mention the Work Pulling Pattern as well. With this approach you could still use a pool and then a single work distributer in front. Each worker (download actor) could perform the download (still using a Future) and only request new work (pull) from the work distributer when that Future has fully completed meaning the download is done.

If you have an upper bound on the amount of simultanious downloads you want to happen you can 'ack' back to the actor saying that a download completed and to free up a spot to download another file:

case object AckFileRequest

class ActorExample(otherActor:ActorRef, maxFileRequests:Int = 1) extends Actor {

  var fileRequests = 0

  def receive = {
    case songId: String if fileRequests < maxFileRequests =>
      fileRequests += 1
      val thisActor = self
      Future {
        val futureFile = downloadFile(songId)
        //not sure if you're returning the downloaded file or a future here, 
        //but you can move this to wherever the downloaded file is and ack
        thisActor ! AckFileRequest

        for (file <- futureFile) {
          val fileName = doSomenthingWith(file)
          otherActor ! fileName
        }
      }
    case songId: String =>
      //Do some throttling here
      val thisActor = self
      context.system.scheduler.scheduleOnce(1 second, thisActor, songId)
    case AckFileRequest => fileRequests -= 1
  }
}

In this example, if there are too many file requests then we put this songId request on hold and queue it back up for processing 1 second later. You can obviously change this however you see fit, maybe you can just send the message straight back to the actor in a tight loop or do some other throttling, depends on your use case.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top