문제

Update: Source code for this question can be found here. Courtesy of danluu.

Is there a standard way of replacing actorFor now that it has been deprecated? Everything I found involves actorSelection - but getting 3 dependencies with setup with a selection is a real pita. Most of the discussion online just says not to use actorFor, but all the books on akka still make use of it (since they target pre-2.2 akka).

For example, let's say I have two actors Pilot and Copilot, both children of the Plane actor. Here is how they currently get a reference to each other (Akka Concurrency, Derek Wyatt, 2013):

// Plane.scala
...
override def preStart(){
    // Get our children going.

    //... create pilot/copilot (in a separate function)
    startPeople()

    // Tell them system is ready to go
    context.actorFor("Pilots/" + pilotName) ! ReadyToGo
    context.actorFor("Pilots/" + copilotName) ! ReadyToGo

// Pilot.scala
def receive = {
    case ReadyToGo =>
      val copilotName = context.system.settings.config.getString("zzz.akka.avionics.flightcrew.copilotName")
      copilot = context.actorFor("../" + copilotName)

// Copilot.scala
def receive = {
    case ReadyToGo =>
      pilot = context.actorFor("../" + pilotName)
      // watch out for pilot dying - take over control when he does
      context.watch(pilot)

    case Controls(ctr) =>
      controls = ctr

    case Terminated(_) =>
      // Pilot died!
      plane ! GiveMeControl

How would I do this without actorFor? Oh, and DI contructor injection would not work here - since both pilot and copilot need to know about each other.

Thanks.

P.S.: Here is the content of startPeople() method, not sure if it matters:

 def startPeople() {
    val plane = self

    val controls: ActorRef = actorForControls("ControlSurfaces")
    val autopilot: ActorRef = actorForControls("Autopilot")
    val altimeter: ActorRef = actorForControls("Altimeter")

    val people = context.actorOf(Props(new IsolatedStopSupervisor with OneForOneStrategyFactory
    {
      override def childStarter() {
        // These children get implicitly added to the hierarchy
        context.actorOf(Props(newCopilot(plane, autopilot, altimeter)), copilotName)
        context.actorOf(Props(newPilot(plane, autopilot, controls, altimeter)),  pilotName)
      }
    }), "Pilots")

    // Use the default strategy here, which restarts indefinitely
    context.actorOf(Props(newLeadFlightAttendant), attendantName)
    Await.result(people ? WaitForStart, 1.second)
  }

Update: Solved

Thanks to Derek Wyatt (author) for the great answer. As an example, here is how I used Futures with map notation, in oneCopilot actor to acquire a dependency on Pilot actor:

package zzz.akka.avionics

import akka.actor._
import zzz.akka.avionics.Pilots.ReadyToGo
import zzz.akka.avionics.Plane.GiveMeControl
import akka.actor.Terminated
import zzz.akka.avionics.Plane.Controls
import scala.concurrent.Future
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.pipe

/** Copilot is a fail-over for the pilot.
 * Created by Andriy Drozdyuk on 05-Apr-14.
 */
class Copilot(plane: ActorRef, autopilot: ActorRef, altimeter: ActorRef) extends Actor with Stash {
  // Implicit execution context for futures
  implicit val ec = context.dispatcher
  // Implicit timeout for getting dependencies
  implicit val timeout = Timeout(1.second)

  val conf = context.system.settings.config
  var pilotName = conf.getString("zzz.akka.avionics.flightCrew.pilotName")

  var controls: ActorRef = context.system.deadLetters

  // Helps us get pilot dependency
  trait PilotAcquisition
  case class PilotAcquired(pilot: ActorRef) extends PilotAcquisition
  case class PilotNotAcquired(t: Throwable) extends PilotAcquisition

  // Acquire the pilot
  // Send either PilotAcquired or PilotNotAcquired message to self
  acquirePilot pipeTo self

  def acquirePilot: Future[PilotAcquisition] = {
    context.actorSelection("../" + pilotName).resolveOne() map {
      pilot => PilotAcquired(pilot)
    } recover {
      case t:Throwable => PilotNotAcquired(t)
    }
  }
  def receive: Receive = waitingForDependencies

  def waitingForDependencies: Receive = {
    case PilotAcquired(pilot) =>
      // Get all the messages we stashed and receive them
      unstashAll()
      // pass all our acquired dependencies in
      context.become(operational(pilot))

    case PilotNotAcquired(t) => throw new IllegalStateException(s"Failed to instantiate: $t")

    // Any other message save for later
    case _ => stash()
  }

  // All our dependencies have been acquired
  def operational(pilot: ActorRef) : Receive = {
    case ReadyToGo =>
      // Start watch on the pilot in case he kicks it
      context.watch(pilot)

    case Controls(ctr) =>
      controls = ctr

    case Terminated(_) =>
      // Pilot died!
      plane ! GiveMeControl

  }


}
도움이 되었습니까?

해결책

ActorSelection is really the way to replace actorFor. However, as I've already stated in the book, using Actor paths or selections can make your app quite brittle.

context.actorSelection("../someactor") ! ReadyToGo
context.actorSelection(self / "someChild" / "someGrandchild") ! ReadyToGo

Change your actor hierarchy, and your app starts failing. However, at least with ActorSelection you get some timeout errors instead of just getting things sent to the dead letter office.

Now, if you want to start grabbing references, that's a different story and I suggest doing it with Futures.

import akka.actor._
import akka.pattern.pipe

class MyActor extends Actor with Stash {
  val actors = for {
    actorA <- ActorSelection(someRootActor / List("child1", "grandchild", "greatGrandchild")).resolveOne()
    actorB <- ActorSelection(someRootActor / List("child2")).resolveOne()
    actorC <- ActorSelection(someOtherRootActor / List("childC")).resolveOne()
  } yield ActorsLocated(actorA, actorB, actorC)
  actors recover {
    case t: Throwable =>
      ActorLocatingFailed(t)
  } pipeTo self

  val uninitialized: Receive = {
    case ActorsLocated(a, b, c) =>
      unstashAll()
      context.become(initialized(a, b, c))
    case ActorLocatingFailed(reason) =>
      throw new IllegalStateException(s"Failed to initialize: $reason")
    case _ =>
      stash()
  }

  def initalized(a: ActorRef, b: ActorRef, c: ActorRef): Receive = {
    // do your stuff here
  }

  def receive = uninitialized
}

Now you have a fully asynchronous startup for your Actor that grabs all of its dependencies properly. On restart, you're good to go.

다른 팁

I think you have two possibilities:

"Injecting" the actors via message (e.g. ReadyToGo):

coPilotActor ! ReadyToGo(pilotActor)
pilotActor ! ReadyToGo(copilotActor)

Using ActorSelection:

 context.actorSelection(actorPath) ! Identify(<correlation_id>)
 // ...
 // response:
 case ActorIdentity(<correlation_id>, None) => // actor ref not found
 // or
 case ActorIdentity(<correlation_id>, Some(ref)) => // actor ref found

Because you are creating the two actors in plane, maybe you find a solution where you don't have to use actor selection in this case.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top