Question

While the errors of my SRP ways were corrected yesterday, I am still wondering how you would cleanly guarantee single-threaded access to an async resource in akka, such as a file handle. Clearly I don't want to allow multiple read and write operations to be dispatched against it from different threads, but if my actor calls a future based API on that file, that's what would likely happen.

The best pattern I've come up with is this:

trait AsyncIO {
  def Read(offset: Int, count: Int) : Future[ByteBuffer] = ???
}

object GuardedIOActor {
  case class Read(offset: Int, count: Int)
  case class ReadResult(data: ByteBuffer)
  private case class ReadCompleted()
}

class GuardedIOActor extends Actor with Stash with AsyncIO {
  import GuardedIOActor._
  var caller :Option[ActorRef] = None

  def receive = {
    case Read(offset,count) => caller match {
      case None => {
        caller = Some(sender)
        Read(offset,count).onSuccess({
          case data => {
            self ! ReadCompleted()
            caller.get ! ReadResult(data)
          }
        })
      }
      case Some(_) => {
        stash()
      }
    }
    case ReadCompleted() => {
      caller = None
      unstashAll()
    }
  }
}

But this requirement cannot be esoteric enough for me to roll that kind of kludge. I mean there ought to be plenty of resources that need synchronized access but have an async API. Am I overlooking some named pattern that is common?

Was it helpful?

Solution

I think the gist of your solution is not so bad, but you can make your actor behave more like a state-machine by using context.become:

class GaurdedIOActor extends Actor with Stash with AsyncIO {
  import GuardedIOActor._

  def receive = notReading

  def notReading: Receive = {
    case Read(offset, count) => {
      val caller = sender
      Read(offset,count).onSuccess({
        case data => {
          self ! ReadCompleted()
          caller ! ReadResult(data)
        }
      })
      context.become(reading)
    }
  }

  def reading: Receive = {
    case r: Read => stash()
    case ReadCompleted() => {
      context.become(notReading)
      unstashAll()
    }
  }
}

Now your actor has two well-defined states and there's no need for the var

OTHER TIPS

I realize this addition is a year late, however this question helped me reason about a similar situation I ran into. The following trait encapsulates the functionality suggested above to reduce boilerplate in actors. It can be mixed in with any actor with Stash. Usage is similar to the pipeTo pattern; just type future.pipeSequentiallyTo(sender) and your actor will not process messages until future has finished and a response has been sent.

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.language.implicitConversions
import scala.util.Failure
import scala.util.Success

import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Stash
import akka.actor.Status

trait SequentialPipeToSupport { this: Actor with Stash =>

  case object ProcessingFinished

  def gotoProcessingState() = context.become(processingState)

  def markProcessingFinished() = self ! ProcessingFinished

  def processingState: Receive = {
    case ProcessingFinished =>
      unstashAll()
      context.unbecome()
    case _ =>
      stash()
  }

  final class SequentialPipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext) {

    def pipeSequentiallyTo(recipient: ActorRef): Future[T] = {

      gotoProcessingState()

      future onComplete {
        case Success(r) =>
          markProcessingFinished()
          recipient ! r
        case Failure(f) =>
          markProcessingFinished()
          recipient ! Status.Failure(f)
      }

      future
    }
  }

  implicit def pipeSequentially[T](future: Future[T])(implicit executionContext: ExecutionContext) = 
    new SequentialPipeableFuture(future)

}

Also available as a Gist

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top